blob: 6836072d73903449d1d9eafe0f6f4a173ac28f28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ee.test.utils;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.URI;
import java.util.concurrent.Semaphore;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.uima.UIMAFramework;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.jms.error.handler.BrokerConnectionException;
import org.apache.uima.util.Level;
public class ActiveMQSupport extends TestCase {
private static final Class CLASS_NAME = ActiveMQSupport.class;
protected static final String DEFAULT_BROKER_URL_KEY="DefaultBrokerURL";
protected static final String DEFAULT_BROKER_URL_KEY_2="DefaultBrokerURL2";
protected static final String DEFAULT_HTTP_BROKER_URL_KEY="DefaultHttpBrokerURL";
protected static final String DEFAULT_HTTP_BROKER_URL_KEY_2="DefaultHttpBrokerURL2";
protected static final int DEFAULT_BROKER_PORT=61617;
protected static final int DEFAULT_BROKER_PORT_2=61620;
protected static final int DEFAULT_HTTP_PORT = 18888;
protected static final int DEFAULT_HTTP_PORT2 = 18890;
protected static BrokerService broker;
protected String uri = null;
protected static ThreadGroup brokerThreadGroup = null;
protected static TransportConnector tcpConnector = null;
protected static final String relativePath = "src" + System.getProperty("file.separator")
+ "test" + System.getProperty("file.separator") + "resources"
+ System.getProperty("file.separator") + "deployment";
protected static final String relativeDataPath = "src" + System.getProperty("file.separator")
+ "test" + System.getProperty("file.separator") + "resources"
+ System.getProperty("file.separator") + "data";
protected static TransportConnector httpConnector = null;
public static Semaphore brokerSemaphore = new Semaphore(1);
protected synchronized void setUp() throws Exception {
super.setUp();
broker = createBroker();
broker.start();
broker.setMasterConnectorURI(uri);
addHttpConnector(DEFAULT_HTTP_PORT);
if ( System.getProperty(DEFAULT_BROKER_URL_KEY) != null) {
System.clearProperty(DEFAULT_BROKER_URL_KEY);
}
System.setProperty(DEFAULT_BROKER_URL_KEY, broker.getMasterConnectorURI());
if ( System.getProperty(DEFAULT_HTTP_BROKER_URL_KEY) != null) {
System.clearProperty(DEFAULT_HTTP_BROKER_URL_KEY);
}
System.setProperty(DEFAULT_HTTP_BROKER_URL_KEY, httpConnector.getConnectUri().toString());
}
protected void cleanBroker( BrokerService targetBroker) throws Exception {
// Remove messages from all queues
targetBroker.deleteAllMessages();
org.apache.activemq.broker.Connection[] connections = targetBroker.getRegionBroker().getClients();
for( org.apache.activemq.broker.Connection connection : connections) {
try {
connection.stop();
} catch( Exception e) {
e.printStackTrace();
}
}
ActiveMQDestination[] destinations = targetBroker.getRegionBroker().getDestinations();
if ( destinations != null ) {
for( ActiveMQDestination destination: destinations ) {
if ( !destination.isTopic() ) {
targetBroker.removeDestination(destination);
}
}
}
}
protected String addHttpConnector(int aDefaultPort) throws Exception {
return addHttpConnector(broker, aDefaultPort);
}
protected String addHttpConnector(BrokerService aBroker, int aDefaultPort) throws Exception {
boolean found = false;
while( !found ) {
try {
httpConnector = addConnector(aBroker, "http",aDefaultPort);
// Use reflection to determine if the AMQ version is at least 5.2. If it is, we must
// plug in a broker to the httpConnector otherwise we get NPE when starting the connector.
// AMQ version 4.1.1 doesn't exhibit this problem.
try {
Method m = httpConnector.getClass().getDeclaredMethod("setBrokerService", new Class[] {BrokerService.class});
m.invoke(httpConnector, aBroker);
} catch ( NoSuchMethodException e) {
// Ignore, this is not AMQ 5.2
}
System.out.println("Adding HTTP Connector:" + httpConnector.getConnectUri());
httpConnector.start();
return httpConnector.getUri().toString();
} catch ( BindException e) {
aDefaultPort++;
} catch ( IOException e) {
if ( e.getCause() != null && e.getCause() instanceof BindException ) {
aDefaultPort++;
} else {
throw new BrokerConnectionException("Unexpected Exception While Connecting to Broker with URL:"+uri+"\n"+e);
}
}
}
throw new BrokerConnectionException("Unable to acquire Open Port for HTTPConnector");
}
protected String getHttpURI() throws Exception {
while ( httpConnector == null ) {
synchronized(this) {
this.wait(100);
}
}
return httpConnector.getConnectUri().toString();
}
protected void removeQueue(String aQueueName) throws Exception {
httpConnector.stop();
}
protected void removeHttpConnector() throws Exception {
httpConnector.stop();
broker.removeConnector(httpConnector);
}
protected TransportConnector addConnector(BrokerService aBroker, String type, int basePort)
throws BrokerConnectionException{
boolean found = false;
TransportConnector transportConnector = null;
while( !found ) {
try {
String uri = type+"://localhost:" + basePort;
transportConnector = aBroker.addConnector(uri);
found = true;
} catch ( BindException e) {
basePort++;
} catch ( IOException e) {
if ( e.getCause() != null && e.getCause() instanceof BindException ) {
basePort++;
} else {
throw new BrokerConnectionException("Unexpected Exception While Connecting to Broker with URL:"+uri+"\n"+e);
}
} catch( Exception e) {
throw new BrokerConnectionException("Unexpected Exception While Connecting to Broker with URL:"+uri+"\n"+e);
}
}
return transportConnector;
}
protected String getBrokerUri() {
return uri;
}
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(uri);
}
protected Connection getConnection() throws Exception {
return createConnectionFactory().createConnection();
}
public BrokerService createBroker() throws Exception {
return createBroker(DEFAULT_BROKER_PORT, true, false);
}
protected BrokerService createBroker(int port, boolean useJmx, boolean secondaryBroker) throws Exception {
String hostName = "localhost";
BrokerService broker =
BrokerFactory.createBroker(new URI("broker:()/" + hostName + "?persistent=false"));
tcpConnector = addConnector(broker, "tcp",port);
uri = tcpConnector.getUri().toString();
System.out.println(">>>> Starting Broker With URL:" + uri);
if ( secondaryBroker ) {
broker.getManagementContext().setJmxDomainName(broker.getManagementContext().getJmxDomainName()+".test");
tcpConnector.setName(DEFAULT_BROKER_URL_KEY_2);
} else {
tcpConnector.setName(DEFAULT_BROKER_URL_KEY);
}
broker.setUseJmx(useJmx);
PolicyEntry policy = new PolicyEntry();
policy.setDeadLetterStrategy(new SharedDeadLetterStrategy());
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker.setPersistent(false);
broker.setUseShutdownHook(true);
broker.setUseLoggingForShutdownErrors(false);
try {
Method method = broker.getClass().getDeclaredMethod("setSchedulerSupport", new Class[] {Boolean.TYPE});
method.invoke(broker, new Object[] {Boolean.FALSE});
} catch( NoSuchMethodException e) {
// ignore
}
return broker;
}
protected BrokerService setupSecondaryBroker(boolean addProperty) throws Exception {
System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
BrokerService broker2 = createBroker(DEFAULT_BROKER_PORT_2, true, true);
broker2.start();
if ( addProperty ) {
System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
}
return broker2;
}
protected void stopBroker() throws Exception {
if (broker != null) {
System.out.println(">>> Stopping Broker");
if (tcpConnector != null) {
tcpConnector.stop();
broker.removeConnector(tcpConnector);
System.out.println("Broker Connector:" + tcpConnector.getUri().toString() + " is stopped");
}
removeHttpConnector();
broker.deleteAllMessages();
broker.stop();
broker.waitUntilStopped();
System.out.println(">>> Broker Stopped");
}
}
protected synchronized void tearDown() throws Exception {
super.tearDown();
System.clearProperty("activemq.broker.jmx.domain");
System.clearProperty("BrokerURL");
stopBroker();
}
}