blob: 904f55de78efde2de6167ec74e8090c955ca0e47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ee.test.utils;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.URI;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Semaphore;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.camel.Exchange;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.uima.UIMAFramework;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.jms.error.handler.BrokerConnectionException;
import org.apache.uima.util.Level;
import org.junit.After;
import org.junit.Before;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.util.ErrorHandler;
public class ActiveMQSupport extends TestCase {
private static final Class CLASS_NAME = ActiveMQSupport.class;
protected static final String DEFAULT_BROKER_URL_KEY="DefaultBrokerURL";
protected static final String DEFAULT_BROKER_URL_KEY_2="DefaultBrokerURL2";
protected static final String DEFAULT_HTTP_BROKER_URL_KEY="DefaultHttpBrokerURL";
protected static final String DEFAULT_HTTP_BROKER_URL_KEY_2="DefaultHttpBrokerURL2";
protected static final int DEFAULT_BROKER_PORT=61617;
protected static final int DEFAULT_BROKER_PORT_2=61620;
protected static final int DEFAULT_HTTP_PORT = 18888;
protected static final int DEFAULT_HTTP_PORT2 = 18890;
protected static BrokerService broker;
/**
* Is set to uri of started broker for the TCP connection
*/
protected String uri = null;
protected static ThreadGroup brokerThreadGroup = null;
protected static TransportConnector tcpConnector = null;
protected static final String relativePath = "src" + System.getProperty("file.separator")
+ "test" + System.getProperty("file.separator") + "resources"
+ System.getProperty("file.separator") + "deployment";
protected static final String resourceDirPath = "src" + System.getProperty("file.separator")
+ "test" + System.getProperty("file.separator") + "resources";
protected static final String relativeDataPath = "src" + System.getProperty("file.separator")
+ "test" + System.getProperty("file.separator") + "resources"
+ System.getProperty("file.separator") + "data";
protected static TransportConnector httpConnector = null;
public static Semaphore brokerSemaphore = new Semaphore(1);
@Before
public synchronized void setUp() throws Exception {
super.setUp();
//BasicConfigurator.configure();
/*
ConsoleAppender console = new ConsoleAppender(); //create appender
//configure the appender
String PATTERN = "%d [%p|%c|%C{1}] %m%n";
console.setLayout(new PatternLayout(PATTERN));
console.setThreshold(org.apache.log4j.Level.WARN);
console.activateOptions();
//add appender to any Logger (here is root)
Logger.getRootLogger().addAppender(console);
*/
broker = createBroker(); // sets uri
/*
broker.setUseJmx(false);
if ( broker.isUseJmx()) {
broker.getManagementContext().setConnectorPort(1098);
}
*/
SystemUsage su = new SystemUsage();
MemoryUsage mu = new MemoryUsage();
mu.setPercentOfJvmHeap(50);
su.setMemoryUsage(mu);
broker.setSystemUsage(su);
broker.start();
//System.out.println("Broker Version:"+broker.getBroker().)
// broker.setMasterConnectorURI(uri);
addHttpConnector(DEFAULT_HTTP_PORT);
if ( System.getProperty(DEFAULT_BROKER_URL_KEY) != null) {
System.clearProperty(DEFAULT_BROKER_URL_KEY);
}
System.setProperty(DEFAULT_BROKER_URL_KEY, broker.getDefaultSocketURIString());
if ( System.getProperty(DEFAULT_HTTP_BROKER_URL_KEY) != null) {
System.clearProperty(DEFAULT_HTTP_BROKER_URL_KEY);
}
System.setProperty(DEFAULT_HTTP_BROKER_URL_KEY, httpConnector.getConnectUri().toString());
// define property so that UIMA AS error handler doesnt call System.exit() if the
// error handler action=terminate.
System.setProperty("dontKill","");
System.setProperty("EXTENDED_TESTS","TRUE");
}
public void drainQueues() throws Exception {
if ( broker != null ) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "............ Deleting ALL messages";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "drainQueues",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
broker.deleteAllMessages();
}
}
protected void cleanBroker( BrokerService targetBroker) throws Exception {
// Remove messages from all queues
// targetBroker.deleteAllMessages();
org.apache.activemq.broker.Connection[] connections = targetBroker.getRegionBroker().getClients();
for( org.apache.activemq.broker.Connection connection : connections) {
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ".............. Forcing Connection Closure - Connection ID:"+connection.getConnectionId();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "cleanBroker",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
connection.stop();
} catch( Exception e) {
e.printStackTrace();
}
}
ActiveMQDestination[] destinations = targetBroker.getRegionBroker().getDestinations();
if ( destinations != null ) {
for( ActiveMQDestination destination: destinations ) {
if ( !destination.isTopic() ) {
targetBroker.removeDestination(destination);
}
}
}
}
protected String addHttpConnector(int aDefaultPort) throws Exception {
return addHttpConnector(broker, aDefaultPort);
}
protected String addHttpConnector(BrokerService aBroker, int aDefaultPort) throws Exception {
boolean found = false;
while( !found ) {
try {
httpConnector = addConnector(aBroker, "http",aDefaultPort);
// Use reflection to determine if the AMQ version is at least 5.2. If it is, we must
// plug in a broker to the httpConnector otherwise we get NPE when starting the connector.
// AMQ version 4.1.1 doesn't exhibit this problem.
try {
Method m = httpConnector.getClass().getDeclaredMethod("setBrokerService", new Class[] {BrokerService.class});
m.invoke(httpConnector, aBroker);
} catch ( NoSuchMethodException e) {
// Ignore, this is not AMQ 5.2
}
Logger.getRootLogger().info("Adding HTTP Connector:" + httpConnector.getConnectUri()+" Name:"+httpConnector.getName());
httpConnector.start();
return httpConnector.getUri().toString();
} catch ( BindException e) {
aDefaultPort++;
} catch ( IOException e) {
if ( e.getCause() != null && e.getCause() instanceof BindException ) {
aDefaultPort++;
} else {
throw new BrokerConnectionException("Unexpected Exception While Connecting to Broker with URL:"+uri+"\n"+e);
}
}
}
throw new BrokerConnectionException("Unable to acquire Open Port for HTTPConnector");
}
protected String getHttpURI() throws Exception {
while ( httpConnector == null ) {
synchronized(this) {
this.wait(100);
}
}
return httpConnector.getConnectUri().toString();
}
protected void removeQueue(String aQueueName) throws Exception {
httpConnector.stop();
}
protected void removeHttpConnector() throws Exception {
httpConnector.stop();
broker.removeConnector(httpConnector);
}
protected TransportConnector addConnector(BrokerService aBroker, String type, int basePort)
throws BrokerConnectionException{
boolean found = false;
TransportConnector transportConnector = null;
while( !found ) {
try {
String uri = type+"://localhost:" + basePort;
transportConnector = aBroker.addConnector(uri);
found = true;
} catch ( BindException e) {
basePort++;
} catch ( IOException e) {
if ( e.getCause() != null && e.getCause() instanceof BindException ) {
basePort++;
} else {
e.printStackTrace();
throw new BrokerConnectionException("Unexpected Exception While Connecting to Broker with URL:"+uri+"\n"+e);
}
} catch( Exception e) {
throw new BrokerConnectionException("Unexpected Exception While Connecting to Broker with URL:"+uri+"\n"+e);
}
}
return transportConnector;
}
protected String getBrokerUri() {
// return "failover:("+uri+")";
return uri;
}
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(uri);
}
protected Connection getConnection() throws Exception {
return createConnectionFactory().createConnection();
}
public BrokerService createBroker() throws Exception {
return createBroker(DEFAULT_BROKER_PORT, false);
}
protected BrokerService createBroker(int port,boolean secondaryBroker) throws Exception {
String hostName = "localhost";
boolean enableJMX = true;
String jmxFlag = System.getProperty("uima.as.enable.jmx");
if ( secondaryBroker ) {
enableJMX = false;
} else if ( jmxFlag != null && jmxFlag.equalsIgnoreCase("false") ) {
enableJMX = false;
}
BrokerService broker =
BrokerFactory.createBroker(new URI("broker:()/" + hostName + "?persistent=false"));
tcpConnector = addConnector(broker, "tcp",port);
uri = tcpConnector.getUri().toString();
Logger.getRootLogger().info(">>>> Starting Broker With URL:" + uri);
int defaultJMXPort = 1098;
if ( secondaryBroker ) {
if ( enableJMX ) {
defaultJMXPort = 1097;
broker.getManagementContext().setJmxDomainName(broker.getManagementContext().getJmxDomainName()+".test");
}
tcpConnector.setName(DEFAULT_BROKER_URL_KEY_2);
} else {
tcpConnector.setName(DEFAULT_BROKER_URL_KEY);
}
if ( enableJMX ) {
broker.setUseJmx(enableJMX);
broker.getManagementContext().setConnectorPort(defaultJMXPort);
} else {
broker.setUseJmx(false);
System.out.println("************** ACTIVEMQ JMX Connector Not Enabled ****************");
}
PolicyEntry policy = new PolicyEntry();
policy.setDeadLetterStrategy(new SharedDeadLetterStrategy());
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker.setPersistent(false);
broker.setUseShutdownHook(true);
broker.setUseLoggingForShutdownErrors(false);
try {
Method method = broker.getClass().getDeclaredMethod("setSchedulerSupport", new Class[] {Boolean.TYPE});
method.invoke(broker, new Object[] {Boolean.FALSE});
} catch( NoSuchMethodException e) {
// ignore
}
return broker;
}
protected BrokerService setupSecondaryBroker(boolean addProperty) throws Exception {
System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test2");
BrokerService broker2 = createBroker(DEFAULT_BROKER_PORT_2, true);
broker2.start();
if ( addProperty ) {
System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
}
return broker2;
}
protected void stopBroker() throws Exception {
if (broker != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ">>> Stopping Broker";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopBroker",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
if (tcpConnector != null) {
tcpConnector.stop();
broker.removeConnector(tcpConnector);
//Logger.getRootLogger().info(message);
Logger.getRootLogger().info("Broker Connector:" + tcpConnector.getUri().toString() + " is stopped");
}
removeHttpConnector();
// MBeanServer jmxServer =
// broker.getManagementContext().getMBeanServer();
// if ( jmxServer != null ) {
// Set<ObjectInstance> instances = jmxServer.queryMBeans(null, null);
// Iterator<ObjectInstance> iterator = instances.iterator();
// while (iterator.hasNext()) {
// ObjectInstance instance = iterator.next();
// System.out.println("-------------- Object Name:t" + instance.getObjectName());
//
// }
// }
broker.deleteAllMessages();
// cleanBroker(broker);
broker.stop();
broker.waitUntilStopped();
// cleanBroker(broker);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ">>> Broker Stopped";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopBroker",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
}
}
@After
public synchronized void tearDown() throws Exception {
super.tearDown();
System.clearProperty("activemq.broker.jmx.domain");
System.clearProperty("BrokerURL");
wait(3000);
if ( !broker.isStopped()) {
cleanBroker(broker);
stopBroker();
}
System.out.println("..... Free Memory:"+Runtime.getRuntime().freeMemory()+" Total Memory:"+Runtime.getRuntime().totalMemory());
}
public class UimaASErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable arg0) {
}
}
public class TestDefaultMessageListenerContainer extends DefaultMessageListenerContainer
implements ExceptionListener {
volatile boolean failed = false;
String reason = "";
public TestDefaultMessageListenerContainer() {
super();
setExceptionListener(this);
}
protected void recoverAfterListenerSetupFailure() {
}
protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) {
if ( !failed && !broker.isStopped()) {
try {
stopSharedConnection();
stop();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "JMS Listener.shutdown() called";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleListenerSetupFailure",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
//stop();
failed = true;
reason = t.getMessage();
} catch( Exception e) {
}
}
}
public boolean failed() {
return failed;
}
public String getReasonForFailure() {
return reason;
}
@Override
protected void handleListenerException(Throwable ex) {
}
@Override
public void afterPropertiesSet() {
// TODO Auto-generated method stub
super.afterPropertiesSet();
}
@Override
public void onException(JMSException exception) {
// TODO Auto-generated method stub
}
}
public class UimaASExceptionHandler implements ExceptionListener {
@Override
public void onException(JMSException arg0) {
}
}
}