blob: 1d0020c6d6abd0525cc589013b96f8c347a1a743 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.adapter.jms.activemq;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaAsThreadFactory;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.Threshold;
import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Level;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.JmsException;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class UimaDefaultMessageListenerContainer extends DefaultMessageListenerContainer implements
ExceptionListener {
private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class;
private String destinationName = "";
private Endpoint endpoint;
private volatile boolean freeCasQueueListener;
private AnalysisEngineController controller;
private volatile boolean failed = false;
private Object mux = new Object();
private final UimaDefaultMessageListenerContainer __listenerRef;
private TaskExecutor taskExecutor = null;
private ConnectionFactory connectionFactory = null;
private Object mux2 = new Object();
private ThreadGroup threadGroup = null;
private ThreadFactory tf = null;
// stores number of consumer threads
private int cc = 0;
// stores message listener plugged in by Spring
private Object ml = null;
// A new listener will be injected between
// spring and JmsInputChannel Pojo Listener. This
// listener purpose is to increment number of children for
// an input CAS.
private ConcurrentMessageListener concurrentListener = null;
private volatile boolean awaitingShutdown = false;
public UimaDefaultMessageListenerContainer() {
super();
UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
__listenerRef = this;
setRecoveryInterval(5);
setAcceptMessagesWhileStopping(false);
setExceptionListener(this);
threadGroup = new ThreadGroup("ListenerThreadGroup_"
+ Thread.currentThread().getThreadGroup().getName());
}
public UimaDefaultMessageListenerContainer(boolean freeCasQueueListener) {
this();
this.freeCasQueueListener = freeCasQueueListener;
}
public void setController(AnalysisEngineController aController) {
controller = aController;
}
/**
*
* @param t
* @return
*/
private boolean disableListener(Throwable t) {
System.out.println(t.toString());
if (t.toString().indexOf("SharedConnectionNotInitializedException") > 0
|| (t instanceof JMSException && t.getCause() != null && t.getCause() instanceof ConnectException))
return true;
return false;
}
/**
* Stops this Listener
*/
private void handleListenerFailure() {
// If shutdown already, nothing to do
if (awaitingShutdown) {
return;
}
try {
if (controller instanceof AggregateAnalysisEngineController) {
String delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(endpoint.getEndpoint());
InputChannel iC = null;
String queueName = null;
if (endpoint.getDestination() != null) {
queueName = endpoint.getDestination().toString();
} else {
queueName = endpoint.getEndpoint();
}
iC = ((AggregateAnalysisEngineController) controller).getInputChannel(queueName);
if (iC != null) {
iC.destroyListener(queueName, delegateKey);
} else {
System.out.println(">>> Listener Unable To LookUp InputChannel For Queue:" + queueName);
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleListenerFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
}
/**
* Handles failure on a temp queue
*
* @param t
*/
private void handleTempQueueFailure(Throwable t) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_jms_listener_failed_WARNING",
new Object[] { endpoint.getDestination(), getBrokerUrl(), t });
}
// Check if the failure is due to the failed connection. Spring (and ActiveMQ) dont seem to
// provide
// the cause. Just the top level IllegalStateException with a text message. This is what we need
// to
// check for.
if (t instanceof javax.jms.IllegalStateException
&& t.getMessage().equals("The Consumer is closed")) {
if (controller != null && controller instanceof AggregateAnalysisEngineController) {
String delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(endpoint.getEndpoint());
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
this.getClass().getName(),
"handleTempQueueFailure",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_stopping_listener_INFO",
new Object[] { controller.getComponentName(), endpoint.getDestination(),
delegateKey });
}
// Stop current listener
handleListenerFailure();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
"handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_stopped_listener_INFO",
new Object[] { controller.getComponentName(), endpoint.getDestination() });
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
}
} else if (disableListener(t)) {
handleQueueFailure(t);
}
}
private ErrorHandler fetchGetMetaErrorHandler() {
ErrorHandler handler = null;
Iterator it = controller.getErrorHandlerChain().iterator();
// Find the error handler for GetMeta in the Error Handler List provided in the
// deployment descriptor
while (it.hasNext()) {
handler = (ErrorHandler) it.next();
if (handler instanceof GetMetaErrorHandler) {
return handler;
}
}
return null;
}
/**
* Handles failures on non-temp queues
*
* @param t
*/
private void handleQueueFailure(Throwable t) {
final String endpointName = (getDestination() == null) ? ""
: ((ActiveMQDestination) getDestination()).getPhysicalName();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_jms_listener_failed_WARNING",
new Object[] { endpointName, getBrokerUrl(), t });
}
boolean terminate = true;
// Check if the failure is severe enough to disable this listener. Whether or not this listener
// is actully
// disabled depends on the action associated with GetMeta Error Handler. If GetMeta Error
// Handler is
// configured to terminate the service on failure, this listener will be terminated and the
// entire service
// will be stopped.
if (disableListener(t)) {
endpoint.setReplyDestinationFailed();
// If this is a listener attached to the Aggregate Controller, use GetMeta Error
// Thresholds defined to determine what to do next after failure. Either terminate
// the service or disable the delegate with which this listener is associated with
if (controller != null && controller instanceof AggregateAnalysisEngineController) {
ErrorHandler handler = fetchGetMetaErrorHandler();
// Fetch a Map containing thresholds for GetMeta for each delegate.
Map thresholds = handler.getEndpointThresholdMap();
// Lookup delegate's key using delegate's endpoint name
String delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(endpoint.getEndpoint());
// If the delegate has a threshold defined on GetMeta apply Action defined
if (delegateKey != null && thresholds.containsKey(delegateKey)) {
// Fetch the Threshold object containing error configuration
Threshold threshold = (Threshold) thresholds.get(delegateKey);
// Check if the delegate needs to be disabled
if (threshold.getAction().equalsIgnoreCase(ErrorHandler.DISABLE)) {
// The disable delegate method takes a list of delegates
List list = new ArrayList();
// Add the delegate to disable to the list
list.add(delegateKey);
try {
System.out.println(">>>> Controller:" + controller.getComponentName()
+ " Disabling Listener On Queue:" + endpoint.getEndpoint() + ". Component's "
+ delegateKey + " Broker:" + getBrokerUrl() + " is Invalid");
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME)
.logrb(
Level.INFO,
this.getClass().getName(),
"handleQueueFailure",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_disabled_delegate_bad_broker__INFO",
new Object[] { controller.getComponentName(), delegateKey,
getBrokerUrl() });
}
// Remove the delegate from the routing table.
((AggregateAnalysisEngineController) controller).disableDelegates(list);
terminate = false; // just disable the delegate and continue
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
terminate = true;
}
}
}
}
}
System.out.println("****** Unable To Connect Listener To Broker:" + getBrokerUrl());
System.out.println("****** Closing Listener on Queue:" + endpoint.getEndpoint());
setRecoveryInterval(0);
// Spin a shutdown thread to terminate listener.
new Thread() {
public void run() {
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_disable_listener__WARNING",
new Object[] { endpointName, getBrokerUrl() });
}
shutdown();
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
}
}.start();
if (terminate) {
terminate(t);
}
}
/**
* This method is called by Spring when a listener fails
*/
protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) {
// If shutdown already, nothing to do
if (awaitingShutdown) {
return;
}
// If controller is stopping not need to recover the connection
if (controller != null && controller.isStopped()) {
return;
}
if ( controller != null ) {
controller.changeState(ServiceState.FAILED);
}
if (endpoint == null) {
super.handleListenerSetupFailure(t, true);
String controllerId = "";
if (controller != null) {
controllerId = "Uima AS Service:" + controller.getComponentName();
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_listener_connection_failure__WARNING",
new Object[] { controllerId, getBrokerUrl() });
}
System.out.println(controllerId + " Listener Unable to Connect to Broker:" + getBrokerUrl()
+ " Retrying ....");
// This code executes during initialization of the service. The Endpoint is not yet
// available. The connection to a broker cannot be established. Keep trying until
// the broker becomes available.
refreshConnectionUntilSuccessful();
if ( controller != null ) {
controller.changeState(ServiceState.RUNNING);
}
System.out.println(controllerId + " Listener Established Connection to Broker:"
+ getBrokerUrl());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_listener_connection_recovered__WARNING",
new Object[] { controllerId, getBrokerUrl() });
}
return;
}
// Connection failure that occurs AFTER the service initialized.
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), t });
}
synchronized (mux) {
if (!failed) {
// Check if this listener is attached to a temp queue. If so, this is a listener
// on a reply queue. Handle temp queue listener failure differently than an
// input queue listener.
if (endpoint.isTempReplyDestination()) {
handleTempQueueFailure(t);
} else {
// Handle non-temp queue failure
handleQueueFailure(t);
}
}
failed = true;
}
}
private void terminate(Throwable t) {
// ****************************************
// terminate the service
// ****************************************
System.out.println(">>>> Terminating Controller:" + controller.getComponentName()
+ " Unable To Initialize Listener Due to Invalid Broker URL:" + getBrokerUrl());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"terminate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_terminate_service_dueto_bad_broker__WARNING",
new Object[] { controller.getComponentName(), getBrokerUrl() });
}
controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t));
if (!controller.isStopped() && !controller.isAwaitingCacheCallbackNotification()) {
controller.stop();
}
}
protected void handleListenerException(Throwable t) {
// Already shutdown, nothing to do
if (awaitingShutdown) {
return;
}
String endpointName = (getDestination() == null) ? ""
: ((ActiveMQDestination) getDestination()).getPhysicalName();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"handleListenerException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_jms_listener_failed_WARNING",
new Object[] { endpointName, getBrokerUrl(), t });
}
super.handleListenerException(t);
}
private void allPropertiesSet() {
super.afterPropertiesSet();
}
private void injectConnectionFactory() {
while (connectionFactory == null) {
try {
Thread.sleep(50);
} catch (Exception e) {
}
}
super.setConnectionFactory(connectionFactory);
}
private void injectTaskExecutor() {
super.setTaskExecutor(taskExecutor);
}
private boolean isGetMetaListener() {
return getMessageSelector() != null
&& __listenerRef.getMessageSelector().equals("Command=2001");
}
private boolean isActiveMQDestination() {
return getDestination() != null && getDestination() instanceof ActiveMQDestination;
}
public void initializeContainer() {
try {
injectConnectionFactory();
initializeTaskExecutor();
injectTaskExecutor();
super.initialize();
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"initializeContainer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
}
/**
* Intercept Spring call to increment number of consumer threads. If the value > 1, don't
* propagate to Spring. A new listener will be injected and it will use provided number of
* consumer threads.
**/
public void setConcurrentConsumers(int concurrentConsumers) {
cc = concurrentConsumers;
if (this.freeCasQueueListener) {
super.setConcurrentConsumers(concurrentConsumers);
}
}
/**
* Intercept Spring call to inject application Pojo listener. Don't propagate the listener up to
* Spring just yet. If more than one consumer thread is used, a different listener will be
* injected.
**/
public void setMessageListener(Object messageListener) {
ml = messageListener;
if (this.freeCasQueueListener) {
super.setMessageListener(messageListener);
}
}
/**
* Called by Spring and some Uima AS components when all properties have been set. This method
* spins a thread in which the listener is initialized.
*/
public void afterPropertiesSet() {
if (endpoint != null) {
// Endpoint has been plugged in from spring xml. This means this is a listener
// for a reply queue. We need to rewire things a bit. First make Spring use
// one thread to make sure we receive messages in order. To fix a race condition
// where a parent CAS is processed first instead of its last child, we need to
// assure that we get the child first. We need to update the counter of the
// parent CAS to reflect that there is another child. In the race condition that
// was observed, the parent was being processed first in one thread. The parent
// reached the final step and subsequently was dropped. Subsequent to that, a
// child CAS processed on another thread begun executing and failed since a look
// on its parent resulted in CAS Not Found In Cache Exception.
// Make sure Spring uses one thread
super.setConcurrentConsumers(1);
if (cc > 1) {
try {
concurrentListener = new ConcurrentMessageListener(cc, ml);
super.setMessageListener(concurrentListener);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
return;
}
} else {
super.setMessageListener(ml);
}
} else {
super.setMessageListener(ml);
super.setConcurrentConsumers(cc);
}
Thread t = new Thread(threadGroup, new Runnable() {
public void run() {
Destination destination = __listenerRef.getDestination();
try {
// Wait until the connection factory is injected by Spring
while (connectionFactory == null) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
}
}
System.setProperty("BrokerURI", ((ActiveMQConnectionFactory) connectionFactory)
.getBrokerURL());
boolean done = false;
// Wait for controller to be injected by Uima AS
if (isActiveMQDestination() && !isGetMetaListener()
&& !((ActiveMQDestination) destination).isTemporary()) {
// Add self to InputChannel
connectWithInputChannel();
// Wait for InputChannel to plug in a controller
done = true;
while (controller == null)
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
}
;
}
// Plug in connection Factory to Spring's Listener
__listenerRef.injectConnectionFactory();
// Initialize the TaskExecutor. This call injects a custom Thread Pool into the
// TaskExecutor provided in the spring xml. The custom thread pool initializes
// an instance of AE in a dedicated thread
initializeTaskExecutor();
// Plug in TaskExecutor to Spring's Listener
__listenerRef.injectTaskExecutor();
// Notify Spring Listener that all properties are ready
__listenerRef.allPropertiesSet();
if (isActiveMQDestination() && destination != null) {
destinationName = ((ActiveMQDestination) destination).getPhysicalName();
}
if (!done) {
connectWithInputChannel();
done = true;
}
if (concurrentListener != null) {
concurrentListener.setAnalysisEngineController(controller);
}
// Save number of concurrent consumers on the temp reply queue in case we need to
// recreate a new listener on a new temp queue created during recovery
if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
Delegate delegate = ((AggregateAnalysisEngineController) controller)
.lookupDelegate(endpoint.getDelegateKey());
if (delegate != null) {
delegate.getEndpoint().setConcurrentReplyConsumers(cc);
}
}
// Show ready message on the console only if this listener is *not* listening
// on an input queue. Input queue listeners are not started until the service
// is fully initialized
if (__listenerRef.getMessageListener() == null && getDestination() != null) {
System.out.println("Service:" + controller.getComponentName()
+ " Listener Ready. Broker:" + getBrokerUrl() + " Queue:" + getDestination());
}
} catch (Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_jms_listener_failed_WARNING",
new Object[] { destination, getBrokerUrl(), e });
}
}
});
t.start();
}
/**
* Inject instance of this listener into the InputChannel
*
* @throws Exception
*/
private void connectWithInputChannel() throws Exception {
Object pojoListener = getPojoListener();
if (pojoListener instanceof JmsInputChannel) {
// Wait until InputChannel has a valid controller. The controller will be plug in
// by Spring on a different thread
while ((((JmsInputChannel) pojoListener).getController()) == null) {
try {
Thread.currentThread().sleep(50);
} catch (Exception e) {
}
}
((JmsInputChannel) pojoListener).setListenerContainer(__listenerRef);
} else if (pojoListener instanceof ModifiableListener) {
((ModifiableListener) pojoListener).setListener(__listenerRef);
}
}
public String getDestinationName() {
return destinationName;
}
public String getEndpointName() {
if (getDestination() != null) {
return ((ActiveMQDestination) getDestination()).getPhysicalName();
}
return null;
}
public String getBrokerUrl() {
return ((ActiveMQConnectionFactory) connectionFactory).getBrokerURL();
}
/*
* Overrides specified Connection Factory. Need to append maxInactivityDuration=0 to the broker
* URL. The Connection Factory is immutable thus we need to intercept the one provided in the
* deployment descriptor and create a new one with rewritten Broker URL. We will inject the
* prefetch policy to the new CF based on what is found in the CF in the deployment descriptor.
*/
public void setConnectionFactory(ConnectionFactory aConnectionFactory) {
connectionFactory = aConnectionFactory;
super.setConnectionFactory(connectionFactory);
}
public void setDestinationResolver(DestinationResolver resolver) {
((TempDestinationResolver) resolver).setListener(this);
super.setDestinationResolver(resolver);
}
public void closeConnection() throws Exception {
try {
setRecoveryInterval(0);
setAcceptMessagesWhileStopping(false);
setAutoStartup(false);
getSharedConnection().close();
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"closeConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { Thread.currentThread().getId(), e });
}
}
}
public void setDestination(Destination aDestination) {
super.setDestination(aDestination);
if (endpoint != null) {
endpoint.setDestination(aDestination);
if (aDestination instanceof TemporaryQueue) {
endpoint.setTempReplyDestination(true);
Object pojoListener = getPojoListener();
if (pojoListener != null && pojoListener instanceof InputChannel) {
((JmsInputChannel) pojoListener).setListenerContainer(this);
}
}
endpoint.setServerURI(getBrokerUrl());
}
}
private Object getPojoListener() {
Object pojoListener = null;
if (ml != null) {
pojoListener = ml;
} else if (getMessageListener() != null) {
pojoListener = getMessageListener();
}
return pojoListener;
}
public Destination getListenerEndpoint() {
return getDestination();
}
public void onException(JMSException arg0) {
if (awaitingShutdown) {
return;
}
String endpointName = (getDestination() == null) ? ""
: ((ActiveMQDestination) getDestination()).getPhysicalName();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"onException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_jms_listener_failed_WARNING",
new Object[] { endpointName, getBrokerUrl(), arg0 });
}
}
public void setTargetEndpoint(Endpoint anEndpoint) {
endpoint = anEndpoint;
}
public boolean isFreeCasQueueListener() {
return freeCasQueueListener;
}
protected void setModifiedTaskExecutor(TaskExecutor taskExecutor) {
super.setTaskExecutor(taskExecutor);
System.out.println("Injected Updated Task Executor Into Listener For Destination:"
+ getDestination());
}
/**
* Delegate shutdown to the super class
*/
public void doDestroy() {
super.destroy();
}
public void setMessageSelector( String messageSelector) {
super.setMessageSelector(messageSelector);
// turn off auto startup. Selectors are only used on input queues. We dont
// want listeners on this queue to start now. Once the service initializes
// we will start listeners on input queue.
this.setAutoStartup(false);
}
/**
* Spins a shutdown thread and stops Sprint and ActiveMQ threads.
*
*/
public void destroy() {
if (awaitingShutdown) {
return;
}
awaitingShutdown = true;
// Spin a thread that will wait until all threads complete. This is needed to avoid
// memory leak caused by the fact that we did not wait to collect the threads
Thread threadGroupDestroyer = new Thread(threadGroup.getParent().getParent(),
"threadGroupDestroyer") {
public void run() {
try {
// stop Spring listener and ActiveMQ threads
__listenerRef.stop();
__listenerRef.closeConnection();
} catch (Exception e) {
}
// If using non-default TaskExecutor, stop its threads
if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdown();
// Since the calling thread may be one of those managed by the executor allow
// for one open thread when checking active thread count.
while (((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().getActiveCount() > 1
&& !((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor()
.isTerminated()) {
try {
((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(200,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
}
}
} else if (concurrentListener != null) {
// Stop internal Executor
concurrentListener.stop();
}
// if ( taskExecutor != null ) {
// System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+
// " +++++++++ Listener:"+getDestination()+" Controller ThreadPoolExecutor Stopped ...");
// }
// Shutdown the listener
__listenerRef.shutdown();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
threadGroup.getParent().list();
}
// System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+
// " ThreadGroupDestroyer waiting for threads to stop. Active thread count:"+threadGroup.activeCount()+" Active thread group count:"+threadGroup.activeGroupCount());
// Wait until all threads are accounted for
while (threadGroup.activeCount() > 0) {
try {
Thread[] threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);
boolean foundExpectedThreads = true;
for (Thread t : threads) {
try {
String tName = t.getName();
// The following is necessary to account for the AMQ threads
// Any threads not named in the list below will cause a wait
// and retry until all non-amq threads are stopped
if (!tName.startsWith("main") && !tName.equalsIgnoreCase("timer-0")
&& !tName.equals("ReaderThread") && !tName.equals("BrokerThreadGroup")
&& !tName.startsWith("ActiveMQ")) {
foundExpectedThreads = false;
break; // from for
}
} catch (Exception e) {
}
}
if (foundExpectedThreads) {
break; // from while
}
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
// System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+
// " ThreadGroupDestroyer all threads stopped");
try {
synchronized (threadGroup) {
if (!threadGroup.isDestroyed()) {
threadGroup.destroy();
}
}
// System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+
// " >>>>>>>>>>>> Listener:"+getDestinationName()+" Thread Group Destroyed");
} catch (Exception e) {
} // Ignore
}
};
threadGroupDestroyer.start();
}
/**
* Called by Spring to inject TaskExecutor
*/
public void setTaskExecutor(TaskExecutor aTaskExecutor) {
taskExecutor = aTaskExecutor;
}
/**
* This method initializes ThreadPoolExecutor with a custom ThreadPool. Each thread produced by
* the ThreadPool is used to first initialize an instance of the AE before the thread is added to
* the pool. From this point on, a thread used to initialize the AE will also be used to call this
* AE's process() method.
*
* @throws Exception
*/
private void initializeTaskExecutor() throws Exception {
// TaskExecutor is only used with primitives
if (controller instanceof PrimitiveAnalysisEngineController) {
// in case the taskExecutor is not plugged in yet, wait until one
// becomes available. The TaskExecutor is plugged in by Spring
synchronized (mux2) {
while (taskExecutor == null) {
mux2.wait(20);
}
}
// Create a Custom Thread Factory. Provide it with an instance of
// PrimitiveController so that every thread can call it to initialize
// the next available instance of a AE.
tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController) controller);
// This ThreadExecutor will use custom thread factory instead of defult one
((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);
// Initialize the thread pool
((ThreadPoolTaskExecutor) taskExecutor).initialize();
// Make sure all threads are started. This forces each thread to call
// PrimitiveController to initialize the next instance of AE
((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
// Change the state of a collocated service
if ( !controller.isTopLevelComponent() ) {
controller.changeState(ServiceState.RUNNING);
}
}
}
public void stop() throws JmsException {
setAcceptMessagesWhileStopping(false);
destroy();
}
}