blob: d9e0ffbe8557df9bf4915cc842a413c17f5c3ff5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.adapter.jms.client;
import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import javax.naming.InitialContext;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UIMA_IllegalArgumentException;
import org.apache.uima.UIMA_IllegalStateException;
import org.apache.uima.aae.AsynchAECasManager_impl;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.VersionCompatibilityChecker;
import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
import org.apache.uima.aae.UimaASApplicationExitEvent;
import org.apache.uima.aae.UimaAsVersion;
import org.apache.uima.aae.client.UimaASStatusCallbackListener;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.ControllerCallbackListener;
import org.apache.uima.aae.controller.ControllerLifecycle;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.UimacppServiceController;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.UimaASMetaRequestTimeout;
import org.apache.uima.aae.jmx.JmxManager;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.ConnectionFactoryIniter;
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
import org.apache.uima.adapter.jms.service.Dd2spring;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.impl.UimaVersion;
import org.apache.uima.internal.util.UUIDGenerator;
import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceConfigurationException;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceManager;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.util.Level;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineCommon_impl
implements UimaAsynchronousEngine, MessageListener, ControllerCallbackListener, ApplicationListener<ApplicationEvent>{
private static final Class CLASS_NAME = BaseUIMAAsynchronousEngine_impl.class;
private MessageSender sender = null;
private MessageProducer producer;
private Session session = null;
private Session consumerSession = null;
private volatile boolean serviceInitializationException;
private volatile boolean serviceInitializationCompleted;
private Semaphore serviceSemaphore = new Semaphore(1);
private Queue consumerDestination = null;
private Session producerSession = null;
private JmxManager jmxManager = null;
private String applicationName = "UimaASClient";
private String endpoint;
protected static Semaphore sharedConnectionSemaphore = new Semaphore(1);
protected static Object connectionMux = new Object();
protected InitialContext jndiContext;
private ObjectName clientJmxObjectName = null;
private String amqUser = null;
private String amqPassword = null;
protected static Lock globalLock = new ReentrantLock();
public BaseUIMAAsynchronousEngine_impl() {
super();
UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
"UIMA Version " + UIMAFramework.getVersionString() +
" UIMA-AS Version " + UimaAsVersion.getVersionString());
}
protected TextMessage createTextMessage() throws ResourceInitializationException {
return new ActiveMQTextMessage();
}
protected BytesMessage createBytesMessage() throws ResourceInitializationException {
return new ActiveMQBytesMessage();
}
/**
* Called at the end of collectionProcessingComplete - WAS closes receiving thread here
*/
protected void cleanup() throws Exception {
}
/**
* Return a name of the queue to which the JMS Producer is connected to.
*/
public String getEndPointName() throws ResourceProcessException {
try {
return clientSideJmxStats.getEndpointName();
} catch (Exception e) {
throw new ResourceProcessException(e);
}
}
protected void setMetaRequestMessage(Message msg) throws Exception {
msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.GetMeta);
msg.setJMSReplyTo(consumerDestination);
if (msg instanceof TextMessage) {
((ActiveMQTextMessage) msg).setText("");
}
}
/**
* Initialize JMS Message with properties relevant to Process CAS request.
*/
protected void setCASMessage(String aCasReferenceId, CAS aCAS, Message msg)
throws ResourceProcessException {
try {
setCommonProperties(aCasReferenceId, msg, SerialFormat.XMI);
((TextMessage) msg).setText(serializeCAS(aCAS));
} catch (Exception e) {
throw new ResourceProcessException(e);
}
}
protected void setCASMessage(String aCasReferenceId, String aSerializedCAS, Message msg)
throws ResourceProcessException {
try {
setCommonProperties(aCasReferenceId, msg, SerialFormat.XMI);
((TextMessage) msg).setText(aSerializedCAS);
} catch (Exception e) {
throw new ResourceProcessException(e);
}
}
protected void setCASMessage(String aCasReferenceId, byte[] aSerializedCAS, Message msg)
throws ResourceProcessException {
try {
setCommonProperties(aCasReferenceId, msg, SerialFormat.BINARY);
((BytesMessage) msg).writeBytes(aSerializedCAS);
} catch (Exception e) {
throw new ResourceProcessException(e);
}
}
protected void setCommonProperties(String aCasReferenceId, Message msg,
SerialFormat serialFormat) throws ResourceProcessException {
try {
msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process);
msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
msg.setIntProperty(AsynchAEMessage.Payload, (serialFormat == SerialFormat.XMI) ? AsynchAEMessage.XMIPayload : AsynchAEMessage.BinaryPayload);
msg.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);
msg.setJMSReplyTo(consumerDestination);
} catch (Exception e) {
throw new ResourceProcessException(e);
}
}
private void stopConnection() {
SharedConnection sharedConnection;
// there is a dedicated shared connection for each broker URI
if ( brokerURI != null && (sharedConnection = lookupConnection(brokerURI)) != null) {
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - acquiring semaphore";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
// use broker dedicated semaphore to lock the code for updates
sharedConnection.getSemaphore().acquire();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg =":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - acquired semaphore";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
// Remove a client from registry
sharedConnection.unregisterClient(this);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 1";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
ActiveMQConnection amqc = (ActiveMQConnection)sharedConnection.getConnection();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 2";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
if (initialized) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 3";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
try {
if ( amqc != null ) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "................ Closing Client Connection client ID:"+amqc.getConnectionInfo().getClientId()+" Client Count:"+sharedConnection.getClientCount();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 5";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
}
if ( amqc != null && amqc.isStarted() &&
((ActiveMQSession)consumerSession).isRunning() ) {
consumerSession.close();
((ActiveMQMessageConsumer)consumer).stop();
consumer.close();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 6";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
if ( sharedConnection.getClientCount() == 1 ) {
sharedConnection.destroy();
amqc.close();
}
}
} catch (Exception exx) {exx.printStackTrace();}
}
// Delete client's temp reply queue from AMQ Broker
if ( amqc != null && !amqc.isClosed() && !amqc.isClosing() && consumerDestination != null &&
consumerDestination instanceof ActiveMQTempDestination ) {
try {
if ( !amqc.isClosed() && !amqc.isTransportFailed()) {
amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination);
}
} catch( Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
} finally {
sharedConnection.destroy();
if ( sharedConnection != null ) {
sharedConnection.getSemaphore().release();
}
}
}
}
public void stop() {
try {
System.out.println(this.getClass().getName()+".stop() - Stopping UIMA-AS Client");
stopConnection();
super.doStop();
if (!running) {
return;
}
running = false;
if (super.serviceDelegate != null) {
// Cancel all timers and purge lists
super.serviceDelegate.cleanup();
}
if (sender != null) {
sender.doStop();
}
try {
// System.out.println(this.getClass().getName()+".stop() - Stopping UIMA-AS Client");
// stopConnection();
// Undeploy all containers
undeploy();
clientCache.clear();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
CLASS_NAME.getName(), "stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_undeployed_containers__INFO");
}
// unregister client
if (jmxManager != null) {
jmxManager.unregisterMBean(clientJmxObjectName);
jmxManager.destroy();
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
CLASS_NAME.getName(), "stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
}
// }
} catch( Exception e) {
e.printStackTrace();
}
}
public void setCPCMessage(Message msg) throws Exception {
msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
msg.setBooleanProperty(AsynchAEMessage.RemoveEndpoint, true);
msg.setJMSReplyTo(consumerDestination);
if (msg instanceof TextMessage) {
((TextMessage) msg).setText("");
}
}
private boolean connectionClosedOrInvalid() {
SharedConnection sharedConnection = lookupConnection(brokerURI);
if (sharedConnection == null
|| sharedConnection.getConnection() == null
|| ((ActiveMQConnection) sharedConnection.getConnection())
.isClosed()
|| ((ActiveMQConnection) sharedConnection.getConnection())
.isClosing()
|| ((ActiveMQConnection) sharedConnection.getConnection())
.isTransportFailed()) {
return true;
}
return false;
}
private SharedConnection createAndInitializeAMQConnection( Semaphore semaphore, String aBrokerURI) throws Exception {
// This only effects Consumer
// Create AMQ specific connection validator. It uses
// AMQ specific approach to test the state of the connection
ActiveMQConnectionValidator connectionValidator = new ActiveMQConnectionValidator();
//Initalize the connection Factory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(aBrokerURI);
connectionFactory.setTrustAllPackages(true);
connectionFactory.setUserName(amqUser);
connectionFactory.setPassword(amqPassword);
// Create a singleton shared connection object
SharedConnection sharedConnection = sharedConnections.get(aBrokerURI);
sharedConnection.setConnectionFactory(connectionFactory);
/*
new SharedConnection(
connectionFactory,
//new ActiveMQConnectionFactory(aBrokerURI),
aBrokerURI);
sharedConnection.setSemaphore(semaphore);
*/
// Add AMQ specific connection validator
sharedConnection
.setConnectionValidator(connectionValidator);
// Connect to broker. Throws exception if unable to connect
sharedConnection.create();
addPrefetch((ActiveMQConnection) sharedConnection
.getConnection());
((ActiveMQConnection) sharedConnection.getConnection()).setExceptionListener(new ClientExceptionListener());
//System.out.println(">>>>>>>>>>>>>>>> Starting Connection to Broker:"+aBrokerURI);
sharedConnection.start();
sharedConnections.put( aBrokerURI, sharedConnection);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
CLASS_NAME.getName(), "createAndInitializeAMQConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_connection_setup_INFO",
new Object[] { aBrokerURI });
}
return sharedConnection;
}
protected SharedConnection createSharedConnection(String aBrokerURI) throws Exception {
Semaphore perBrokerSemaphore;
SharedConnection sharedConnection;
try {
// Use global sharedConnectionSemaphore to lock the code below to
// prevent two threads from creating per broker semaphore in case
// sharedConnection equals null.
sharedConnectionSemaphore.acquire();
// fetch shared connection object for a given broker URI
sharedConnection = sharedConnections.get(aBrokerURI);
// if null, we need create create a new one and start the AMQ connection.
// Each shared connection has its own binary semaphore to guard critical
// code.
if ( sharedConnection == null ) {
// create dedicated semaphore for this broker
perBrokerSemaphore = new Semaphore(1);
sharedConnection = new SharedConnection(null,aBrokerURI);
sharedConnection.setSemaphore(perBrokerSemaphore);
sharedConnections.put( aBrokerURI, sharedConnection);
//sharedConnection = createAndInitializeAMQConnection(perBrokerSemaphore, aBrokerURI);
} else {
// fetch dedicated semaphore from shared connectiion object
perBrokerSemaphore = sharedConnection.getSemaphore();
}
} catch( Exception e) {
throw e;
} finally {
sharedConnectionSemaphore.release();
}
try {
// Used broker specific semaphore to lock critical code
if ( !perBrokerSemaphore.tryAcquire(2, TimeUnit.SECONDS) ) {
throw new TimeoutException("UIMA-AS Client Timed Out Waiting to Acquire Broker Semaphore (2 Seconds) - Broker:"+aBrokerURI);
}
} catch( Exception e) {
throw e;
}
//System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> THREAD:"+Thread.currentThread().getId()+" LOCK: "+aBrokerURI);
try {
// check if AMQ connection is still valid
if ( connectionClosedOrInvalid() ) {
// connection either stale or not present. Create new shared connection object for this broker
sharedConnection = createAndInitializeAMQConnection(perBrokerSemaphore, aBrokerURI);
}
// Create and initialize dispatch thread with JMS Producer
initializeProducer(brokerURI, endpoint, sharedConnection.getConnection());
// Create JMS Consumer and plug in Listener for processing replies
initializeConsumer(brokerURI, sharedConnection.getConnection());
// Increment number of client instances. SharedConnection object is a static
// and is used to share a single JMS connection. The connection is closed
// when the last client finishes processing and calls stop().
if (sharedConnection != null) {
sharedConnection.registerClient(this);
}
} catch( Exception e) {
throw e;
} finally {
//System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> THREAD:"+Thread.currentThread().getId()+" UN-LOCK: "+aBrokerURI);
perBrokerSemaphore.release();
}
return sharedConnection;
}
private void addPrefetch(ActiveMQConnection aConnection) {
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(5);
((ActiveMQConnection) aConnection).setPrefetchPolicy(prefetchPolicy);
}
protected SharedConnection validateConnection(String aBrokerURI) throws Exception {
// checks if a sharedConnection exists and if not creates a new one
return createSharedConnection(aBrokerURI);
}
protected Session getSession(String aBrokerURI) throws Exception {
SharedConnection sharedConnection = validateConnection(aBrokerURI);
return getSession(sharedConnection.getConnection());
}
protected Session getSession(Connection aConnection) throws Exception {
session = aConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session;
}
protected MessageProducer lookupProducerForEndpoint(Endpoint anEndpoint) throws Exception {
if (lookupConnection(brokerURI) == null || producerSession == null) {
throw new ResourceInitializationException();
}
Destination dest = producerSession.createQueue(anEndpoint.getEndpoint());
return producerSession.createProducer(dest);
}
protected void initializeProducer(String aBrokerURI, String aQueueName) throws Exception {
// Check if a sharedConnection exists. If not it creates one
SharedConnection sharedConnection = createSharedConnection(aBrokerURI);
initializeProducer(aBrokerURI, aQueueName, sharedConnection.getConnection());
}
protected void initializeProducer(String aBrokerURI, String aQueueName, Connection aConnection)
throws Exception {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"initializeProducer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_init_jms_producer_INFO", new Object[] { aBrokerURI, aQueueName });
}
brokerURI = aBrokerURI;
// Create a worker thread for sending messages. Jms sessions are single threaded
// and it is illegal (per JMS spec) to use the same sesssion from multiple threads.
// The worker thread solves this problem. As it is the only thread that owns the
// session and uses it to create message producer.
// The worker thread blocks waiting for messages from application threads. The
// application threads add messages to the shared "queue" (in-memory queue not
// jms queue) and the worker thread consumes them. The worker thread is not
// serialializing CASes. This work is done in application threads.
// create a Message Dispatcher object. In its constructor it acquires a shared
// semaphore producerSemaphore and holds it until the producer is created an
// and initialized. Once this happens or there is an error, the semaphore is
// released.
sender = new ActiveMQMessageSender(aConnection, aQueueName, this);
producerInitialized = false;
Thread t = new Thread((BaseMessageSender) sender);
// Start the worker thread. The jms session and message producer are created. Once
// the message producer is created, the worker thread notifies this thread by
// calling onProducerInitialized() where the global flag 'producerInitialized' is
// set to true. After the notification, the worker thread notifies this instance
// that the producer is fully initialized and finally begins to wait for messages
// in pendingMessageList. Upon arrival, each message is removed from
// pendingMessageList and it is sent to a destination.
t.start();
try {
// Block waiting for the Sender to complete initializing the Producer.
// The sender will release the lock once it instantiates and initializes
// the Producer object or if there is an error
producerSemaphore.acquire();
} catch (InterruptedException ex) {
} finally {
producerSemaphore.release();
}
// Check if the worker thread failed to initialize.
if (sender.failed()) {
// Worker thread failed to initialize. Log the reason and stop the uima ee client
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"initializeProducer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_worker_thread_failed_to_initialize__WARNING",
new Object[] { sender.getReasonForFailure() });
}
stop();
return;
}
}
/**
* Create a JMS Consumer on a temporary queue. Service replies will be handled by this consumer.
*
* @param aBrokerURI
* @throws Exception
*/
protected void initializeConsumer(String aBrokerURI) throws Exception {
SharedConnection sharedConnection = createSharedConnection(aBrokerURI);
initializeConsumer(aBrokerURI, sharedConnection.getConnection());
}
protected void initializeConsumer(String aBrokerURI, Connection connection) throws Exception {
// In case we are recovering from a bad broker connection, invalidate old
// JMS objects first.
if ( consumerSession != null ) {
try {
consumer.close();
consumerSession.close();
} catch( Exception e) {
// ignore, creating a new Session below
}
}
consumerSession = getSession(connection);
consumerDestination = consumerSession.createTemporaryQueue();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"initializeConsumer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_init_jms_consumer_INFO",
new Object[] { aBrokerURI, consumerDestination.getQueueName() });
}
consumer = consumerSession.createConsumer(consumerDestination);
consumer.setMessageListener(this);
}
/**
* Replaces place holder with syntax ${pname} with a system property whose name is pname
*
* @param aPlaceholder to resolve
* @return - the actual broker URL
* @throws ResourceInitializationException
*/
private String replacePlaceholder(String aPlaceholder) throws ResourceInitializationException {
// find placeholder starting and ending positions
// If no '${' or following '}' leave as-is
int startPos = aPlaceholder.indexOf("${");
if (startPos < 0) {
return aPlaceholder;
}
int endPos = aPlaceholder.indexOf("}", startPos);
if (endPos < 0) {
return aPlaceholder;
}
// extract the name
String placeholder = aPlaceholder.substring(startPos+2, endPos);
// using the name, find the broker URL. This property must exist or exception is thrown
String url = System.getProperty(placeholder);
// the property is missing
if ( url == null ) {
throw new ResourceInitializationException(new Exception("UIMA AS Client Initialization Exception. Value for placeholder:"+placeholder+" is not defined in the system properties."));
}
return aPlaceholder.substring(0, startPos) + url + aPlaceholder.substring(endPos+1);
}
/**
* Initialize the uima ee client. Takes initialization parameters from the
* <code>anApplicationContext</code> map.
*/
public synchronized void initialize(Map anApplicationContext)
throws ResourceInitializationException {
// Add ShutdownHook to make sure the connection to the
// broker is always closed on process exit.
shutdownHookThread = new Thread(new UimaASShutdownHook(this));
Runtime.getRuntime().addShutdownHook(shutdownHookThread);
// throws an exception if verions of UIMA-AS is not compatible with UIMA SDK
VersionCompatibilityChecker.check(CLASS_NAME, "UIMA AS Client", "initialize");
/*
// Check for compatibility with a version of uima sdk. Only check major versions.
if (UimaAsVersion.getMajorVersion() != UimaVersion.getMajorVersion() ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.WARNING,
CLASS_NAME.getName(),
"initialize",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_incompatible_version_WARNING",
new Object[] { "UIMA AS Client", UimaAsVersion.getUimajFullVersionString(),
UimaVersion.getFullVersionString() });
throw new ResourceInitializationException(new AsynchAEException(
"Version of UIMA-AS is Incompatible with a Version of UIMA Core. UIMA-AS Version is built to depend on Core UIMA version:"
+ UimaAsVersion.getUimajFullVersionString() + " but is running with version:"
+ UimaVersion.getFullVersionString()));
}
*/
if (running) {
throw new ResourceInitializationException(new UIMA_IllegalStateException());
}
reset();
Properties performanceTuningSettings = null;
if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ServerUri)) {
throw new ResourceInitializationException();
}
if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ENDPOINT)) {
throw new ResourceInitializationException();
}
ResourceManager rm = null;
if (anApplicationContext.containsKey(Resource.PARAM_RESOURCE_MANAGER)) {
rm = (ResourceManager) anApplicationContext.get(Resource.PARAM_RESOURCE_MANAGER);
} else {
rm = UIMAFramework.newDefaultResourceManager();
}
performanceTuningSettings = new Properties();
if (anApplicationContext.containsKey(UIMAFramework.CAS_INITIAL_HEAP_SIZE)) {
String cas_initial_heap_size = (String) anApplicationContext
.get(UIMAFramework.CAS_INITIAL_HEAP_SIZE);
performanceTuningSettings.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, cas_initial_heap_size);
}
asynchManager = new AsynchAECasManager_impl(rm);
brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
endpoint = (String) anApplicationContext.get(UimaAsynchronousEngine.ENDPOINT);
// Check if a placeholder is passed in instead of actual broker URL or endpoint.
// The placeholder has the syntax ${placeholderName} and may be imbedded in text.
// A system property with placeholderName must exist for successful placeholder resolution.
// Throws ResourceInitializationException if placeholder is not in the System properties.
brokerURI = replacePlaceholder(brokerURI);
endpoint = replacePlaceholder(endpoint);
clientSideJmxStats.setEndpointName(endpoint);
int casPoolSize = 1;
if (anApplicationContext.containsKey(UimaAsynchronousEngine.CasPoolSize)) {
casPoolSize = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CasPoolSize))
.intValue();
clientSideJmxStats.setCasPoolSize(casPoolSize);
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout)) {
processTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.Timeout))
.intValue();
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.GetMetaTimeout)) {
metadataTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.GetMetaTimeout))
.intValue();
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.CpcTimeout)) {
cpcTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CpcTimeout))
.intValue();
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.ApplicationName)) {
applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName);
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.SERIALIZATION_STRATEGY)) {
final String serializationStrategy = (String) anApplicationContext.get(UimaAsynchronousEngine.SERIALIZATION_STRATEGY);
// change this to support compressed filitered as the default
setSerialFormat((serializationStrategy.equalsIgnoreCase("xmi")) ? SerialFormat.XMI : SerialFormat.BINARY);
// setSerialFormat((serializationStrategy.equalsIgnoreCase("xmi")) ? SerialFormat.XMI : SerialFormat.COMPRESSED_FILTERED);
clientSideJmxStats.setSerialization(getSerialFormat());
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.userName)) {
amqUser = (String) anApplicationContext
.get(UimaAsynchronousEngine.userName);
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.password)) {
amqPassword = (String) anApplicationContext
.get(UimaAsynchronousEngine.password);
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.TimerPerCAS)) {
timerPerCAS = ((Boolean) anApplicationContext.get(UimaAsynchronousEngine.TimerPerCAS))
.booleanValue();
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME)
.logrb(
Level.CONFIG,
CLASS_NAME.getName(),
"initialize",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_init_uimaee_client__CONFIG",
new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout,
cpcTimeout,timerPerCAS });
}
super.serviceDelegate = new ClientServiceDelegate(endpoint, applicationName, this);
super.serviceDelegate.setCasProcessTimeout(processTimeout);
super.serviceDelegate.setGetMetaTimeout(metadataTimeout);
try {
// Generate unique identifier
String uuid = UUIDGenerator.generate();
// JMX does not allow ':' in the ObjectName so replace these with underscore
uuid = uuid.replaceAll(":", "_");
uuid = uuid.replaceAll("-", "_");
applicationName += "_" + uuid;
jmxManager = new JmxManager("org.apache.uima");
clientSideJmxStats.setApplicationName(applicationName);
clientJmxObjectName = new ObjectName("org.apache.uima:name=" + applicationName);
jmxManager.registerMBean(clientSideJmxStats, clientJmxObjectName);
// Check if sharedConnection exists. If not create a new one. The sharedConnection
// is static and shared by all instances of UIMA AS client in a jvm. The check
// is made in a critical section by first acquiring a global static semaphore to
// prevent a race condition.
createSharedConnection(brokerURI);
running = true;
// This is done to give the broker enough time to 'finalize' creation of
// temp reply queue. It's been observed (on MAC OS only) that AMQ
// broker QueueSession.createTemporaryQueue() call is not synchronous. Meaning,
// return from createTemporaryQueue() does not guarantee immediate availability
// of the temp queue. It seems like this operation is asynchronous, causing:
// "InvalidDestinationException: Cannot publish to a deleted Destination..."
// on the service side when it tries to reply to the client.
try {
wait(100);
} catch( InterruptedException e) {}
sendMetaRequest();
waitForMetadataReply();
if (abort || !running) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_aborting_as_WARNING", new Object[] { "Metadata Timeout" });
}
throw new ResourceInitializationException(new UimaASMetaRequestTimeout());
} else {
if (collectionReader != null) {
asynchManager.addMetadata(collectionReader.getProcessingResourceMetaData());
}
asynchManager.initialize(casPoolSize, "ApplicationCasPoolContext",
performanceTuningSettings);
// Create a special CasPool of size 1 to be used for deserializing CASes from a Cas
// Multiplier
if (super.resourceMetadata != null
&& super.resourceMetadata instanceof AnalysisEngineMetaData) {
if (((AnalysisEngineMetaData) super.resourceMetadata).getOperationalProperties()
.getOutputsNewCASes()) {
// Create a Shadow CAS Pool used to de-serialize CASes produced by a CAS Multiplier
asynchManager.initialize(1, SHADOW_CAS_POOL, performanceTuningSettings);
}
}
initialized = true;
remoteService = true;
// running = true;
for (int i = 0; listeners != null && i < listeners.size(); i++) {
((UimaASStatusCallbackListener) listeners.get(i)).initializationComplete(null);
}
}
} catch (ResourceInitializationException e) {
state = ClientState.FAILED;
notifyOnInitializationFailure(e);
throw e;
} catch (Exception e) {
state = ClientState.FAILED;
notifyOnInitializationFailure(e);
throw new ResourceInitializationException(e);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized__INFO",
new Object[] { UimaAsynchronousEngine.SERIALIZATION_STRATEGY });
}
// Acquire cpcReady semaphore to block sending CPC request until
// ALL outstanding CASes are received.
super.acquireCpcReadySemaphore();
state = ClientState.RUNNING;
}
/**
* First generates a Spring context from a given deploy descriptor and than deploys the context
* into a Spring Container.
*
* @param aDeploymentDescriptor
* - deployment descriptor to generate Spring Context from
* @param anApplicationContext
* - a Map containing properties required by dd2spring
*
* @return - a unique spring container id
*
*/
public String deploy(String aDeploymentDescriptor, Map anApplicationContext) throws Exception {
String springContext = null;
try {
springContext = generateSpringContext(aDeploymentDescriptor, anApplicationContext);
SpringContainerDeployer springDeployer = new SpringContainerDeployer(springContainerRegistry, this);
String id = springDeployer.deploy(springContext);
if ( springDeployer.isInitialized() ) {
springDeployer.startListeners();
}
return id;
} catch (Exception e) {
running = true;
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
throw e;
} finally {
String uimaAsDebug = (String) anApplicationContext.get(UimaAsynchronousEngine.UimaEeDebug);
if ( springContext != null && (null == uimaAsDebug || uimaAsDebug.equals("") ) ) {
disposeContextFiles(springContext);
}
}
}
private void disposeContextFiles(String ...contextFiles) {
for( String contextFile: contextFiles) {
File file = new File(contextFile);
if ( file.exists()) {
file.delete();
}
}
}
/**
*
*/
public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
throws Exception {
if (aDeploymentDescriptorList == null) {
throw new ResourceConfigurationException(UIMA_IllegalArgumentException.ILLEGAL_ARGUMENT,
new Object[] { "Null", "DeploymentDescriptorList", "deploy()" });
}
if (aDeploymentDescriptorList.length == 0) {
throw new ResourceConfigurationException(
ResourceConfigurationException.MANDATORY_VALUE_MISSING,
new Object[] { "DeploymentDescriptorList" });
}
String[] springContextFiles = new String[aDeploymentDescriptorList.length];
for (int i = 0; i < aDeploymentDescriptorList.length; i++) {
springContextFiles[i] = generateSpringContext(aDeploymentDescriptorList[i],
anApplicationContext);
}
SpringContainerDeployer springDeployer = new SpringContainerDeployer(springContainerRegistry);
try {
String id = springDeployer.deploy(springContextFiles);
if ( springDeployer.isInitialized() ) {
springDeployer.startListeners();
}
return id;
} catch (ResourceInitializationException e) {
running = true;
throw e;
} finally {
String uimaAsDebug = (String) anApplicationContext.get(UimaAsynchronousEngine.UimaEeDebug);
if ( null == uimaAsDebug || uimaAsDebug.equals("")) {
disposeContextFiles(springContextFiles);
}
// disposeContextFiles(springContextFiles);
}
}
public void undeploy() throws Exception {
Iterator containerIterator = springContainerRegistry.keySet().iterator();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg ="undeploying "+springContainerRegistry.size()+" Containers";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
while (containerIterator.hasNext()) {
String containerId = (String) containerIterator.next();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "Undeploying Container Id:"+containerId;
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
undeploy(containerId);
}
}
public void undeploy(String aSpringContainerId) throws Exception {
this.undeploy(aSpringContainerId, SpringContainerDeployer.STOP_NOW);
}
/**
* Undeploys Spring container with a given container Id. All deployed Spring containers are
* registered in the local registry under a unique id.
*
*/
public void undeploy(String aSpringContainerId, int stop_level) throws Exception {
if (aSpringContainerId == null ) {
return;
}
UimaEEAdminSpringContext adminContext = null;
if (!springContainerRegistry.containsKey(aSpringContainerId)) {
return;
// throw new InvalidContainerException("Invalid Spring container Id:" + aSpringContainerId +
// ". Unable to undeploy the Spring container");
}
// Fetch an administrative context which contains a Spring Container
adminContext = (UimaEEAdminSpringContext) springContainerRegistry.get(aSpringContainerId);
if (adminContext == null) {
throw new InvalidContainerException(
"Spring Container Does Not Contain Valid UimaEEAdminSpringContext Object");
}
// Fetch instance of the Container from its context
ApplicationContext ctx = adminContext.getSpringContainer();
// Query the container for objects that implement
// ControllerLifecycle interface. These
// objects are typically of type AnalysisEngineController or
// UimacppServiceController.
String[] asyncServiceList = ctx
.getBeanNamesForType(org.apache.uima.aae.controller.ControllerLifecycle.class);
// Given a valid list of controllers select the first from the list
// and
// initiate a shutdown. We don't care which controller will be
// invoked. In case of
// AggregateAnalysisEngineController the terminate event will
// propagate all the way
// to the top controller in the hierarchy and the shutdown will take
// place from there.
// If the controller is of kind UimecppServiceController or
// PrimitiveAnalysisController
// the termination logic will be immediately triggered in the
// terminate() method.
if (asyncServiceList != null && asyncServiceList.length > 0) {
boolean topLevelController = false;
ControllerLifecycle ctrer = null;
int indx = 0;
while (!topLevelController) {
ctrer = (ControllerLifecycle) ctx.getBean(asyncServiceList[indx++]);
if (ctrer instanceof UimacppServiceController
|| ((AnalysisEngineController) ctrer).isTopLevelComponent()) {
topLevelController = true;
}
}
// Send a trigger to initiate shutdown.
if (ctrer != null) {
if (ctrer instanceof AnalysisEngineController &&
((AnalysisEngineController) ctrer).getControllerLatch() != null ) {
((AnalysisEngineController) ctrer).getControllerLatch().release();
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "++++++++++++++++++++++ calling terminate()-service:"+((AnalysisEngineController) ctrer).getComponentName();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
switch (stop_level) {
case SpringContainerDeployer.QUIESCE_AND_STOP:
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "++++++++++++++++++++++ calling quiesceAndStop()";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
((AnalysisEngineController) ctrer).quiesceAndStop();
break;
case SpringContainerDeployer.STOP_NOW:
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "++++++++++++++++++++++ calling terminate()";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
((AnalysisEngineController) ctrer).terminate();
break;
}
}
}
if (ctx instanceof FileSystemXmlApplicationContext) {
((FileSystemXmlApplicationContext) ctx).destroy();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "---------------------- Destroying Application Context:"+((FileSystemXmlApplicationContext) ctx).getApplicationName();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "undeploy",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
}
// Remove the container from a local registry
springContainerRegistry.remove(aSpringContainerId);
}
/**
* Use dd2spring to generate Spring context file from a given deployment descriptor file.
*
* @param aDeploymentDescriptor
* - deployment descriptor to generate Spring Context from
* @param anApplicationContext
* - a Map containing properties required by dd2spring
* @return - an absolute path to the generated Spring Context file
*
* @throws Exception
* - if failure occurs
*/
private String generateSpringContext(String aDeploymentDescriptor, Map anApplicationContext)
throws Exception {
String dd2SpringXsltFilePath = null;
String saxonClasspath = null;
if (anApplicationContext.containsKey(UimaAsynchronousEngine.DD2SpringXsltFilePath)) {
dd2SpringXsltFilePath = (String) anApplicationContext
.get(UimaAsynchronousEngine.DD2SpringXsltFilePath);
} else {
throw new ResourceConfigurationException(
ResourceConfigurationException.MANDATORY_VALUE_MISSING,
new Object[] { "Xslt File Path" });
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.SaxonClasspath)) {
saxonClasspath = (String) anApplicationContext.get(UimaAsynchronousEngine.SaxonClasspath);
} else {
throw new ResourceConfigurationException(
ResourceConfigurationException.MANDATORY_VALUE_MISSING,
new Object[] { "Saxon Classpath" });
}
Dd2spring dd2Spring = new Dd2spring();
File springContextFile = dd2Spring.convertDd2Spring(aDeploymentDescriptor,
dd2SpringXsltFilePath, saxonClasspath, (String) anApplicationContext
.get(UimaAsynchronousEngine.UimaEeDebug));
return springContextFile.getAbsolutePath();
}
/**
* Deploys provided context files ( and beans) in a new Spring container.
*
*/
protected String deploySpringContainer(String[] springContextFiles)
throws ResourceInitializationException {
SpringContainerDeployer springDeployer = new SpringContainerDeployer(this);
try {
return springDeployer.deploy(springContextFiles);
} catch (ResourceInitializationException e) {
// turn on the global flag so that the stop() can do the cleanup
running = true;
throw e;
}
}
protected void waitForServiceNotification() throws Exception {
while (!serviceInitializationCompleted) {
if (serviceInitializationException) {
throw new ResourceInitializationException();
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_awaiting_container_init__INFO", new Object[] {});
}
try {
serviceSemaphore.acquire();
} catch (InterruptedException e) {
} finally {
serviceSemaphore.release();
}
if (serviceInitializationException) {
throw new ResourceInitializationException();
}
}
}
protected void deployEmbeddedBroker() throws Exception {
// TBI
}
public static void main(String[] args) {
try {
BaseUIMAAsynchronousEngineCommon_impl uimaee = new BaseUIMAAsynchronousEngine_impl();
Map appContext = new HashMap();
appContext.put(UimaAsynchronousEngine.DD2SpringXsltFilePath, args[1]);
appContext.put(UimaAsynchronousEngine.SaxonClasspath, args[2]);
String containerId = uimaee.deploy(args[0], appContext); // args[1],
// args[2]);
uimaee.undeploy(containerId);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
}
}
public void setReleaseCASMessage(TextMessage msg, String aCasReferenceId) throws Exception {
msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.ReleaseCAS);
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
msg.setJMSReplyTo(consumerDestination);
}
public void notifyOnInitializationFailure(Exception e) {
notifyOnInitializationFailure(null, e);
}
public void notifyOnInitializationSuccess() {
notifyOnInitializationSuccess(null);
}
public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
// Initialization exception. Notify blocking thread and indicate a problem
serviceInitializationException = true;
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_container_init_exception__WARNING", e);
}
serviceSemaphore.release();
}
public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
serviceInitializationCompleted = true;
serviceSemaphore.release();
}
public void notifyOnTermination(String aServiceName, EventTrigger cause) {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
statCL.onUimaAsServiceExit(cause);
}
}
public void notifyOnTermination(String aServiceName, String aCasReferenceId, Exception cause) {
// super.n
}
protected MessageProducer getMessageProducer(Destination destination) throws Exception {
return sender.getMessageProducer(destination);
}
/**
* Request Uima AS client to initiate sending Stop requests to a service for all outstanding CASes
* awaiting reply.
*
*/
public void stopProducingCases() {
String[] casIdsPendingReply = serviceDelegate.getDelegateCasIdsPendingReply();
if ( casIdsPendingReply != null && casIdsPendingReply.length > 0 ) {
for( String casReferenceId : casIdsPendingReply ) {
// The Cas is still being processed
ClientRequest clientCachedRequest = (ClientRequest) clientCache
.get(casReferenceId);
if (clientCachedRequest != null && !clientCachedRequest.isMetaRequest()
&& clientCachedRequest.getCasReferenceId() != null) {
stopProducingCases(casReferenceId, clientCachedRequest.getFreeCasNotificationQueue());
}
}
}
}
/**
* Request Uima AS client to initiate sending Stop request to a service for a given CAS id If the
* service is a Cas Multiplier, it will stop producing new CASes, will wait until all child CASes
* finish and finally returns the input CAS.
*
*/
public void stopProducingCases(String aCasReferenceId) {
// The Cas is still being processed
ClientRequest clientCachedRequest = (ClientRequest) clientCache.get(aCasReferenceId);
if (clientCachedRequest != null) {
stopProducingCases(aCasReferenceId, clientCachedRequest.getFreeCasNotificationQueue());
}
}
// private void stopProducingCases(ClientRequest clientCachedRequest) {
private void stopProducingCases(String casReferenceId, Destination cmFreeCasQueue) {
try {
// if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
if (cmFreeCasQueue != null) {
TextMessage msg = createTextMessage();
msg.setText("");
msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
// msg.setStringProperty(AsynchAEMessage.CasReference, clientCachedRequest.getCasReferenceId());
msg.setStringProperty(AsynchAEMessage.CasReference, casReferenceId);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop);
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
try {
MessageProducer msgProducer = getMessageProducer(cmFreeCasQueue);
if (msgProducer != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {casReferenceId,cmFreeCasQueue});
}
// Send STOP message to Cas Multiplier Service
msgProducer.send(msg);
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_unable_to_send_stop_to_cm__WARNING");
}
}
} catch (Exception ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING",
ex);
}
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
}
}
protected MessageSender getDispatcher() {
return sender;
}
public void notifyOnReconnecting(String aMessage) {
}
public void notifyOnReconnectionSuccess() {
}
public void onApplicationEvent(ApplicationEvent event) {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
if ( event instanceof UimaASApplicationExitEvent) {
statCL.onUimaAsServiceExit( ((UimaASApplicationExitEvent)event).getEventTrigger());
}
}
}
public class ClientExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException arg0) {
arg0.printStackTrace();
}
}
}