blob: c8f40e5511119a459ed012c7ddfbf040777befeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.adapter.jms.client;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UIMARuntimeException;
import org.apache.uima.UIMA_IllegalStateException;
import org.apache.uima.aae.AsynchAECasManager;
import org.apache.uima.aae.UIDGenerator;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaSerializer;
import org.apache.uima.aae.client.UimaASProcessStatus;
import org.apache.uima.aae.client.UimaASProcessStatusImpl;
import org.apache.uima.aae.client.UimaASStatusCallbackListener;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.delegate.Delegate.DelegateEntry;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.InvalidMessageException;
import org.apache.uima.aae.error.MessageTimeoutException;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.UimaASCollectionProcessCompleteTimeout;
import org.apache.uima.aae.error.UimaASMetaRequestTimeout;
import org.apache.uima.aae.error.UimaASPingTimeout;
import org.apache.uima.aae.error.UimaASProcessCasTimeout;
import org.apache.uima.aae.error.UimaEEServiceException;
import org.apache.uima.aae.jmx.UimaASClientInfo;
import org.apache.uima.aae.jmx.UimaASClientInfoMBean;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.adapter.jms.ConnectionValidator;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.message.PendingMessage;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.AllowPreexistingFS;
import org.apache.uima.cas.impl.BinaryCasSerDes6;
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.cas.impl.Serialization;
import org.apache.uima.cas.impl.TypeSystemImpl;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
import org.apache.uima.cas.impl.BinaryCasSerDes6.ReuseInfo;
import org.apache.uima.collection.CollectionReader;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.jms.error.handler.BrokerConnectionException;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.resourceSpecifier.factory.SerializationStrategy;
import org.apache.uima.util.Level;
import org.apache.uima.util.ProcessTrace;
import org.apache.uima.util.XMLInputSource;
import org.apache.uima.util.impl.ProcessTrace_impl;
public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsynchronousEngine,
MessageListener {
private static final Class CLASS_NAME = BaseUIMAAsynchronousEngineCommon_impl.class;
public enum ClientState {INITIALIZING, RUNNING, FAILED, RECONNECTING, STOPPING, STOPPED};
protected ClientState state = ClientState.INITIALIZING;
protected String brokerURI = null;
protected static final String SHADOW_CAS_POOL = "ShadowCasPool";
protected static final int MetadataTimeout = 1;
protected static final int CpCTimeout = 2;
protected static final int ProcessTimeout = 3;
protected static final int PingTimeout = 4;
protected volatile boolean initialized;
protected List listeners = new ArrayList();
protected AsynchAECasManager asynchManager;
protected boolean remoteService = false;
protected CollectionReader collectionReader = null;
protected volatile boolean running = false;
protected ProcessingResourceMetaData resourceMetadata;
protected CAS sendAndReceiveCAS = null;
protected UIDGenerator idGenerator = new UIDGenerator();
protected ConcurrentHashMap<String, ClientRequest> clientCache = new ConcurrentHashMap<String, ClientRequest>();
protected ConcurrentHashMap<Long, ThreadMonitor> threadMonitorMap = new ConcurrentHashMap<Long, ThreadMonitor>();
// Default timeout for ProcessCas requests
protected int processTimeout = 0;
// Default timeout for GetMeta requests
protected int metadataTimeout = 60000;
// Default timeout for CpC requests is no timeout
protected int cpcTimeout = 0;
protected volatile boolean abort = false;
protected static final String uniqueIdentifier = String.valueOf(System.nanoTime());
protected Exception exc;
// Counter maintaining a number of CASes sent to a service. The counter
// is incremented every time a CAS is sent and decremented when the CAS
// reply is received. It is also adjusted down in case of a timeout or
// error.
protected AtomicLong outstandingCasRequests = new AtomicLong();
protected AtomicLong totalCasRequestsSentBetweenCpCs = new AtomicLong();
protected ConcurrentHashMap springContainerRegistry = new ConcurrentHashMap();
protected MessageConsumer consumer = null;
protected SerialFormat serialFormat = SerialFormat.XMI;
protected TypeSystemImpl remoteTypeSystem; // of the remote service, for filtered binary compression
protected UimaASClientInfoMBean clientSideJmxStats = new UimaASClientInfo();
private UimaSerializer uimaSerializer = new UimaSerializer();
protected ClientServiceDelegate serviceDelegate = null;
private Object stopMux = new Object();
private Object sendMux = new Object();
private BlockingQueue<CasQueueEntry> threadQueue = new LinkedBlockingQueue<CasQueueEntry>();
private ConcurrentHashMap<Long, CasQueueEntry> threadRegistrar = new ConcurrentHashMap<Long, CasQueueEntry>();
private volatile boolean casQueueProducerReady;
private Object casProducerMux = new Object();
protected BlockingQueue<PendingMessage> pendingMessageQueue = new LinkedBlockingQueue<PendingMessage>();
// Create Semaphore that will signal when the producer object is initialized
protected Semaphore producerSemaphore = new Semaphore(1);
// Create Semaphore that will signal when CPC reply has been received
protected Semaphore cpcSemaphore = new Semaphore(1);
// Create Semaphore that will signal when GetMeta reply has been received
protected Semaphore getMetaSemaphore = new Semaphore(0, true);
// Signals when the client is ready to send CPC request
protected Semaphore cpcReadySemaphore = new Semaphore(1);
// Signals receipt of a CPC reply
protected Semaphore cpcReplySemaphore = new Semaphore(1);
protected volatile boolean producerInitialized;
protected static ConcurrentHashMap<String, SharedConnection> sharedConnections =
new ConcurrentHashMap<String, SharedConnection>();
//protected static SharedConnection sharedConnection = null;
protected Thread shutdownHookThread = null;
private ExecutorService exec = Executors.newFixedThreadPool(1);
private volatile boolean casMultiplierDelegate;
abstract public String getEndPointName() throws Exception;
abstract protected TextMessage createTextMessage() throws Exception;
abstract protected BytesMessage createBytesMessage() throws Exception;
abstract protected void setMetaRequestMessage(Message msg) throws Exception;
abstract protected void setCASMessage(String casReferenceId, CAS aCAS, Message msg)
throws Exception;
abstract protected void setCASMessage(String casReferenceId, String aSerializedCAS, Message msg)
throws Exception;
abstract protected void setCASMessage(String casReferenceId, byte[] aSerializedCAS, Message msg)
throws Exception;
abstract public void setCPCMessage(Message msg) throws Exception;
abstract public void initialize(Map anApplicationContext) throws ResourceInitializationException;
abstract protected void cleanup() throws Exception;
abstract public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
throws Exception;
abstract protected String deploySpringContainer(String[] springContextFiles)
throws ResourceInitializationException;
abstract protected MessageSender getDispatcher();
abstract protected void initializeConsumer(String aBrokerURI, Connection connection) throws Exception;
// enables/disable timer per CAS. Defaul is to use single timer for
// all outstanding CASes
protected volatile boolean timerPerCAS=false;
//abstract protected String getBrokerURI();
protected void setBrokeryURI(String brokerURI ) {
this.brokerURI = brokerURI;
}
protected String getBrokerURI() {
return brokerURI;
}
public void addStatusCallbackListener(UimaAsBaseCallbackListener aListener) {
if (running) {
throw new UIMA_IllegalStateException(JmsConstants.JMS_LOG_RESOURCE_BUNDLE,"UIMAJMS_listener_added_after_initialize__WARNING", new Object[]{});
}
listeners.add(aListener);
}
public SerialFormat getSerialFormat() {
return serialFormat;
}
protected void setSerialFormat(SerialFormat aSerialFormat) {
serialFormat = aSerialFormat;
}
public TypeSystemImpl getRemoteTypeSystem() {
return remoteTypeSystem;
}
protected void setRemoteTypeSystem(TypeSystemImpl remoteTypeSystem) {
this.remoteTypeSystem = remoteTypeSystem;
}
/**
* Serializes a given CAS.
*
* @param aCAS
* - CAS to serialize
* @return - serialized CAS
*
* @throws Exception
*/
protected String serializeCAS(CAS aCAS, XmiSerializationSharedData serSharedData)
throws Exception {
return uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
}
protected String serializeCAS(CAS aCAS) throws Exception {
XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
return uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
}
public void removeStatusCallbackListener(UimaAsBaseCallbackListener aListener) {
listeners.remove(aListener);
}
public void onBeforeMessageSend(UimaASProcessStatus status) {
try {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
statCL.onBeforeMessageSend(status);
}
} catch( Throwable t) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"onBeforeMessageSend", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", t);
}
}
public void onBeforeProcessCAS(UimaASProcessStatus status, String nodeIP, String pid) {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
try {
statCL.onBeforeProcessCAS(status, nodeIP, pid);
} catch( Throwable t) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"onBeforeProcessCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", t);
}
}
}
public void onBeforeProcessMeta(String nodeIP, String pid) {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners
.get(i);
try {
statCL.onBeforeProcessMeta(nodeIP, pid);
} catch (Throwable t) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
getClass().getName(), "onBeforeProcessMeta",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", t);
}
}
}
public synchronized void setCollectionReader(CollectionReader aCollectionReader)
throws ResourceInitializationException {
if (initialized) {
// Uima ee client has already been initialized. CR should be
// set before calling initialize()
throw new ResourceInitializationException();
}
collectionReader = aCollectionReader;
}
private void addMessage(PendingMessage msg) {
pendingMessageQueue.add(msg);
}
protected void acquireCpcReadySemaphore() {
try {
// Acquire cpcReady semaphore to block sending CPC request until
// ALL outstanding CASes are received.
cpcReadySemaphore.acquire();
} catch (InterruptedException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_interrupted_while_acquiring_cpcReadySemaphore__WARNING", new Object[] {});
}
}
}
public synchronized void collectionProcessingComplete() throws ResourceProcessException {
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_app_cpc_request_FINEST", new Object[] {});
}
if (outstandingCasRequests.get() > 0) {
UIMAFramework.getLogger(CLASS_NAME)
.logrb(
Level.INFO,
CLASS_NAME.getName(),
"collectionProcessingComplete",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_in_cpc_INFO",
new Object[] { outstandingCasRequests.get(),
totalCasRequestsSentBetweenCpCs.get() });
}
// If the client was initialized but never sent any CASes its cpcReadySemaphore
// must be first explicitly released to enable the code to send CPC to a service.
// The semaphore is initially acquired in the initialize(Map) method and typically
// released when the number of CASes sent equals the number of CASes received. Since
// no CASes were sent we must do the release here to be able to continue.
if (totalCasRequestsSentBetweenCpCs.get() == 0 && !serviceDelegate.isAwaitingPingReply()) {
cpcReadySemaphore.release();
}
// The cpcReadySemaphore was initially acquired in the initialize() method
// so below we wait until ALL CASes are processed. Once all
// CASes are received the semaphore will be released
acquireCpcReadySemaphore();
serviceDelegate.cancelDelegateTimer();
if (!running) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_cpc_request_not_done_INFO", new Object[] {});
}
return;
}
ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this); // , timeout);
requestToCache.setIsRemote(remoteService);
requestToCache.setCPCRequest(true);
requestToCache.setCpcTimeout(cpcTimeout);
requestToCache.setEndpoint(getEndPointName());
clientCache.put(uniqueIdentifier, requestToCache);
PendingMessage msg = new PendingMessage(AsynchAEMessage.CollectionProcessComplete);
if (cpcTimeout > 0) {
requestToCache.startTimer();
msg.put(UimaAsynchronousEngine.CpcTimeout, String.valueOf(cpcTimeout));
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_started_cpc_request_timer_FINEST", new Object[] {});
}
// Add CPC message to the pending queue
addMessage(msg);
// Acquire cpc semaphore. When a CPC reply comes or there is a timeout or the client
// is stopped, the semaphore will be released.
try {
cpcReplySemaphore.acquire();
} catch (InterruptedException ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_interrupted_while_acquiring_cpcReplySemaphore__WARNING", new Object[] {});
}
}
// Wait for CPC Reply. This blocks on the cpcReplySemaphore
waitForCpcReply();
totalCasRequestsSentBetweenCpCs.set(0); // reset number of CASes sent to a service
cancelTimer(uniqueIdentifier);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_cancelled_cpc_request_timer_FINEST", new Object[] {});
}
if (running) {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
((UimaASStatusCallbackListener) listeners.get(i)).collectionProcessComplete(null);
}
}
} catch (Exception e) {
throw new ResourceProcessException(e);
}
}
private void releaseCacheEntries() {
Iterator it = clientCache.keySet().iterator();
while (it.hasNext()) {
ClientRequest entry = clientCache.get((String) it.next());
if (entry != null && entry.getCAS() != null) {
entry.getCAS().release();
}
}
}
private void clearThreadRegistrar() {
Iterator it = threadRegistrar.keySet().iterator();
while (it.hasNext()) {
Long key = (Long) it.next();
CasQueueEntry entry = threadRegistrar.get(key);
if (entry != null) {
entry.clear();
}
}
}
public void doStop() {
synchronized (stopMux) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_as_client_INFO",
new Object[] {});
}
if (!running) {
return;
}
exec.shutdownNow();
casQueueProducerReady = false;
if (serviceDelegate != null) {
serviceDelegate.cancelDelegateTimer();
serviceDelegate.cancelDelegateGetMetaTimer();
}
try {
try {
clearThreadRegistrar();
releaseCacheEntries();
} catch (Exception ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"stop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", ex);
}
}
// Unblock threads
if (threadMonitorMap.size() > 0) {
Iterator it = threadMonitorMap.keySet().iterator();
while (it.hasNext()) {
long key = ((Long) it.next()).longValue();
ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(key);
if (threadMonitor == null || threadMonitor.getMonitor() == null) {
continue;
}
threadMonitor.getMonitor().release();
}
}
cpcReadySemaphore.release();
outstandingCasRequests.set(0); // reset global counter of outstanding requests
cpcReplySemaphore.release();
getMetaSemaphore.release();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_as_client_INFO",
new Object[] {});
}
for (Iterator i = springContainerRegistry.entrySet().iterator(); i.hasNext();) {
Map.Entry entry = (Map.Entry) i.next();
Object key = entry.getKey();
undeploy((String) key);
}
asynchManager = null;
springContainerRegistry.clear();
listeners.clear();
clientCache.clear();
threadQueue.clear();
// Add empty CasQueueEntry object to the queue so that we wake up a reader thread which
// may be sitting in the threadQueue.take() method. The reader will
// first check the state of the 'running' flag and find it false which
// will cause the reader to exit run() method
threadQueue.add(new CasQueueEntry());
threadRegistrar.clear();
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"stop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
} finally {
// Shutdown hook has been previously added in case an application 'forgets'
// to call stop. Since we are in the stop() method, the hook is no longer
// needed. Catch IllegalStateException which is thrown by JVM if it is already
// in the process of shutting down
if ( shutdownHookThread != null ) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHookThread);
} catch( IllegalStateException e) {}
}
}
}
}
/**
* This method spins a thread where CASes are distributed to requesting threads in an orderly
* fashion. CASes are distributed among the threads based on FIFO. Oldest waiting thread receive
* the CAS first.
*
*/
private void serveCASes() {
synchronized (casProducerMux) {
if (casQueueProducerReady) {
return; // Only one CAS producer thread is needed/allowed
}
casQueueProducerReady = true;
}
// Spin a CAS producer thread
new Thread() {
public void run() {
// terminate when the client API is stopped
while (running) {
try {
// Remove the oldest CAS request from the queue.
// Every thread requesting a CAS adds an entry to this
// queue.
CasQueueEntry entry = threadQueue.take();
CAS cas = null;
long startTime = System.nanoTime();
// Wait for a free CAS instance
if (!running || asynchManager == null) {
return; // client API has been stopped
}
if (remoteService) {
cas = asynchManager.getNewCas("ApplicationCasPoolContext");
} else {
cas = asynchManager.getNewCas();
}
long waitingTime = System.nanoTime() - startTime;
clientSideJmxStats.incrementTotalTimeWaitingForCas(waitingTime);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"serveCASes.run()",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_new_cas_FINEST",
new Object[] { "Time Waiting for CAS",
(double) waitingTime / (double) 1000000 });
}
if (running) { // only if the client is still running handle the new cas
// Assigns a CAS and releases a semaphore
entry.setCas(cas);
} else {
return; // Client is terminating
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"serveCASes.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
}
}
}.start();
}
/**
* Returns a CAS. If multiple threads call this method, the order of each request is preserved.
* The oldest waiting thread receives the CAS. Each request for a CAS is queued, and when the CAS
* becomes available the oldest waiting thread will receive it for processing.
*/
public CAS getCAS() throws Exception {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_request_for_cas_FINEST",
new Object[] {});
}
if (!running) {
throw new RuntimeException("Uima AS Client Is Stopping");
}
if (!initialized) {
throw new ResourceInitializationException();
}
// Spin a thread that fetches CASes from the CAS Pool
if (!casQueueProducerReady) {
serveCASes(); // start CAS producer thread
}
// Each thread has an entry in the map. The entry is created in the
// map once and cached.
CasQueueEntry entry = getQueueEntry(Thread.currentThread().getId());
// Add this thread entry to the queue of threads waiting for a CAS
threadQueue.add(entry);
if (entry != null) {
while (running) {
CAS cas = null;
// getCas() blocks until a CAS is set in the CAS producer thread
// ( see serveCASes() above).
// If the client is stopped, the cleanup code will force getCas()
// to release a semaphore to prevent a hang.
if ( (cas = entry.getCas()) == null) {
break;
} else {
// We may have been waiting in entry.getCas() for awhile
// so first check the status of the client. If is not running
// the semaphore in getCas() was forcefully released to prevent
// a hang. If we are stopping just return null.
if ( !running ) {
break;
}
return cas;
}
} // while
}
return null; // client has terminated
}
private CasQueueEntry getQueueEntry(long aThreadId) {
CasQueueEntry entry = null;
if (threadRegistrar.containsKey(aThreadId)) {
entry = threadRegistrar.get(aThreadId);
} else {
// Creates a new instance and acquires a semaphore. The semaphore
// is released in the CasQueueEntry.setCAS()
entry = new CasQueueEntry();
threadRegistrar.put(aThreadId, entry);
}
return entry;
}
protected void reset() {
}
/**
* This class manages access to a CAS. A CAS is assigned
* to an instance of this class in the CAS producer thread and
* it is consumed in the getCAS() method. Access to a CAS
* is protected by a semaphore.
*
*/
private static class CasQueueEntry {
private CAS cas;
private Semaphore semaphore = null;
public CasQueueEntry() {
semaphore = new Semaphore(1);
// Acquire the semaphore to force getCAS() to block
// until setCAS() or clear() is called
semaphore.acquireUninterruptibly();
}
public CAS getCas() {
// Wait until setCAS() is called
semaphore.acquireUninterruptibly();
return cas;
}
public void setCas(CAS cas) {
this.cas = cas;
// Release semaphore so that getCAS() can return a CAS instance
semaphore.release();
}
// Called when the client is stopping to
// release a semaphore
public void clear() {
cas = null;
semaphore.release();
}
}
protected void sendMetaRequest() throws Exception {
PendingMessage msg = new PendingMessage(AsynchAEMessage.GetMeta);
ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this); // , metadataTimeout);
requestToCache.setIsRemote(remoteService);
requestToCache.setMetaRequest(true);
requestToCache.setMetadataTimeout(metadataTimeout);
requestToCache.setEndpoint(getEndPointName());
clientCache.put(uniqueIdentifier, requestToCache);
// Add message to the pending queue
addMessage(msg);
}
protected void waitForCpcReply() {
try {
// wait for CPC reply
cpcReplySemaphore.acquire();
} catch (InterruptedException e) {
} finally {
cpcReplySemaphore.release();
}
}
/**
* Blocks while trying to acquire a semaphore awaiting receipt of GetMeta Reply. When the GetMeta
* is received, or there is a timeout, or the client stops the semaphore will be released.
*/
protected void waitForMetadataReply() {
try {
getMetaSemaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getPerformanceReport() {
return null;
}
public synchronized void process() throws ResourceProcessException {
if (!initialized) {
throw new ResourceProcessException();
}
if (collectionReader == null) {
throw new ResourceProcessException();
}
if (!casQueueProducerReady) {
serveCASes(); // start CAS producer thread
}
CAS cas = null;
boolean hasNext = true;
while (initialized && running ) {
try {
if ( (hasNext = collectionReader.hasNext()) == true) {
cas = getCAS();
collectionReader.getNext(cas);
sendCAS(cas);
} else {
break;
}
} catch (Exception e) {
e.printStackTrace();
throw new ResourceProcessException(e);
}
}
// If the CR is done, enter a polling loop waiting for outstanding CASes to return
// from a service
if (hasNext == false ) {
Object mObject = new Object();
// if client is running and there are outstanding CASe go sleep for awhile and
// try again, until all CASes come back from a service
while ( running && serviceDelegate.getCasPendingReplyListSize() > 0 ) {
synchronized(mObject) {
try {
mObject.wait(100);
} catch( Exception e) {
e.printStackTrace();
throw new ResourceProcessException(e);
}
}
}
collectionProcessingComplete();
}
}
protected ConcurrentHashMap<String, ClientRequest> getCache() {
return clientCache;
}
/**
* Sends a given CAS for analysis to the UIMA EE Service.
*
*/
private String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException {
synchronized (sendMux) {
if ( requestToCache == null ) {
throw new ResourceProcessException(new Exception("Invalid Process Request. Cache Entry is Null"));
}
String casReferenceId = requestToCache.getCasReferenceId();
try {
if (!running) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendCAS",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_sending_cas_INFO",
new Object[] { "Asynchronous Client is Stopping" });
}
return null;
}
clientCache.put(casReferenceId, requestToCache);
PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
long t1 = System.nanoTime();
switch (serialFormat) {
case XMI:
XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
String serializedCAS = serializeCAS(aCAS, serSharedData);
msg.put(AsynchAEMessage.CAS, serializedCAS);
if (remoteService) { // always true 5/2013
// Store the serialized CAS in case the timeout occurs and need to send the
// the offending CAS to listeners for reporting
requestToCache.setCAS(serializedCAS);
requestToCache.setXmiSerializationSharedData(serSharedData);
}
break;
case BINARY:
byte[] serializedBinaryCAS = uimaSerializer.serializeCasToBinary(aCAS);
msg.put(AsynchAEMessage.CAS, serializedBinaryCAS);
break;
case COMPRESSED_FILTERED:
// can't use uimaserializer directly - project doesn't have ref to this one
// for storing the reuse info
BinaryCasSerDes6 bcs = new BinaryCasSerDes6(aCAS, this.getRemoteTypeSystem());
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
bcs.serialize(baos);
requestToCache.setCompress6ReuseInfo(bcs.getReuseInfo());
msg.put(AsynchAEMessage.CAS, baos.toByteArray());
break;
default:
throw new UIMARuntimeException(new Exception("Internal Error"));
}
requestToCache.setCAS(aCAS);
requestToCache.setSerializationTime(System.nanoTime() - t1);
msg.put(AsynchAEMessage.CasReference, casReferenceId);
requestToCache.setIsRemote(remoteService);
requestToCache.setEndpoint(getEndPointName());
requestToCache.setProcessTimeout(processTimeout);
requestToCache.clearTimeoutException();
// The sendCAS() method is synchronized no need to synchronize the code below
if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
SharedConnection sharedConnection = lookupConnection(getBrokerURI());
// Send Ping to service as getMeta request
if ( !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen() ) {
serviceDelegate.setAwaitingPingReply();
// Add the cas to a list of CASes pending reply. Also start the timer if necessary
// serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
// since the service is in time out state, we dont send CASes to it just yet. Instead, place
// a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING
// arrives.
serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId(), aCAS.hashCode(), timerPerCAS);
if ( cpcReadySemaphore.availablePermits() > 0 ) {
acquireCpcReadySemaphore();
}
// Send PING Request to check delegate's availability
sendMetaRequest();
// @@@@@@@@@@@@@@@ Changed on 4/20 serviceDelegate.cancelDelegateTimer();
// Start a timer for GetMeta ping and associate a cas id
// with this timer. The delegate is currently in a timed out
// state due to a timeout on a CAS with a given casReferenceId.
//
serviceDelegate.startGetMetaRequestTimer(casReferenceId);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendCAS",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
new Object[] { serviceDelegate.getKey() });
}
return casReferenceId;
} else {
if ( !requestToCache.isSynchronousInvocation() && !sharedConnection.isOpen() ) {
Exception exception = new BrokerConnectionException("Unable To Deliver CAS:"+requestToCache.getCasReferenceId()+" To Destination. Connection To Broker "+getBrokerURI()+" Has Been Lost");
handleException(exception, requestToCache.getCasReferenceId(), null, requestToCache, true);
return casReferenceId;
} else {
// Add to the outstanding list.
// serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
// since the service is in time out state, we dont send CASes to it just yet. Instead, place
// a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING
// arrives.
serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId(), aCAS.hashCode(), timerPerCAS);
return casReferenceId;
}
}
}
SharedConnection sharedConnection = lookupConnection(getBrokerURI());
if ( !sharedConnection.isOpen() ) {
if (requestToCache != null && !requestToCache.isSynchronousInvocation() && aCAS != null ) {
aCAS.release();
}
throw new ResourceProcessException(new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+sharedConnection.getBroker()+" Has Been Lost"));
}
// Incremented number of outstanding CASes sent to a service. When a reply comes
// this counter is decremented
outstandingCasRequests.incrementAndGet();
// Increment total number of CASes sent to a service. This is reset
// on CPC
totalCasRequestsSentBetweenCpCs.incrementAndGet();
// Add message to the pending queue
addMessage(msg);
} catch (ResourceProcessException e) {
clientCache.remove(casReferenceId);
throw e;
} catch (Exception e) {
clientCache.remove(casReferenceId);
throw new ResourceProcessException(e);
}
return casReferenceId;
}
}
/**
* Checks the state of a delegate to see if it is in TIMEOUT State. If it is, push the CAS id onto
* a list of CASes pending dispatch. The delegate is in a questionable state and the aggregate
* sends a ping message to check delegate's availability. If the delegate responds to the ping,
* all CASes in the pending dispatch list will be immediately dispatched.
**/
public boolean delayCasIfDelegateInTimedOutState(String aCasReferenceId, long casHashcode) throws AsynchAEException {
if (serviceDelegate != null && serviceDelegate.getState() == Delegate.TIMEOUT_STATE) {
// Add CAS id to the list of delayed CASes.
serviceDelegate.addCasToPendingDispatchList(aCasReferenceId, casHashcode, timerPerCAS);
return true;
}
return false; // Cas Not Delayed
}
private ClientRequest produceNewClientRequestObject() {
String casReferenceId = idGenerator.nextId();
return new ClientRequest(casReferenceId, this);
}
/**
* Sends a given CAS for analysis to the UIMA EE Service.
*
*/
public synchronized String sendCAS(CAS aCAS) throws ResourceProcessException {
if ( !running ) {
throw new ResourceProcessException(new UimaEEServiceException("Uima AS Client Has Been Stopped. Rejecting Request to Process CAS"));
}
return this.sendCAS(aCAS, produceNewClientRequestObject());
}
/**
* Handles response to CollectionProcessComplete request.
*
* @throws Exception
*/
protected void handleCollectionProcessCompleteReply(Message message) throws Exception {
int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
try {
if (AsynchAEMessage.Exception == payload) {
ProcessTrace pt = new ProcessTrace_impl();
UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
Exception exception = retrieveExceptionFromMessage(message);
status.addEventStatus("CpC", "Failed", exception);
notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleCollectionProcessCompleteReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_received_exception_msg_INFO",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
getBrokerURI(),
message.getStringProperty(AsynchAEMessage.CasReference), exception });
}
} else {
// After receiving CPC reply there may be cleanup to do. Delegate this
// to platform specific implementation (ActiveMQ or WAS)
cleanup();
}
} catch (Exception e) {
throw e;
} finally {
// Release the semaphore acquired in collectionProcessingComplete()
cpcReplySemaphore.release();
}
}
/**
* Handles response to GetMeta Request. Deserializes ResourceMetaData and initializes CasManager.
*
* @param message
* - jms message containing serialized ResourceMetaData
*
* @throws Exception
*/
protected void handleMetadataReply(Message message) throws Exception {
serviceDelegate.cancelDelegateGetMetaTimer();
serviceDelegate.setState(Delegate.OK_STATE);
// check if the reply msg contains replyTo destination. It will be
// added by the Cas Multiplier to the getMeta reply
if (message.getJMSReplyTo() != null) {
serviceDelegate.setFreeCasDestination(message.getJMSReplyTo());
}
// Check if this is a reply for a Ping sent in response to a timeout
if (serviceDelegate.isAwaitingPingReply()) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleMetadataReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_rcvd_ping_reply__INFO",
new Object[] {
message.getStringProperty(AsynchAEMessage.MessageFrom),
message.getStringProperty(AsynchAEMessage.ServerIP)});
}
// reset the state of the service. The client received its ping reply
serviceDelegate.resetAwaitingPingReply();
String casReferenceId = null;
if (serviceDelegate.getCasPendingReplyListSize() > 0 || serviceDelegate.getCasPendingDispatchListSize() > 0) {
serviceDelegate.restartTimerForOldestCasInOutstandingList();
// We got a reply to GetMeta ping. Send all CASes that have been
// placed on the pending dispatch queue to a service.
while( serviceDelegate.getState()==Delegate.OK_STATE && (casReferenceId = serviceDelegate.removeOldestFromPendingDispatchList()) != null ) {
ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
if (cachedRequest != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleMetadataReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_dispatch_delayed_cas__INFO",
new Object[] { casReferenceId, String.valueOf(cachedRequest.cas.hashCode())});
}
sendCAS(cachedRequest.getCAS(), cachedRequest);
}
}
} else {
ProcessTrace pt = new ProcessTrace_impl();
UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
notifyListeners(null, status, AsynchAEMessage.GetMeta);
}
// Handled Ping reply
return;
}
int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
removeFromCache(uniqueIdentifier);
try {
if (AsynchAEMessage.Exception == payload) {
ProcessTrace pt = new ProcessTrace_impl();
UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
Exception exception = retrieveExceptionFromMessage(message);
clientSideJmxStats.incrementMetaErrorCount();
status.addEventStatus("GetMeta", "Failed", exception);
notifyListeners(null, status, AsynchAEMessage.GetMeta);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleMetadataReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_received_exception_msg_INFO",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
getBrokerURI(),
message.getStringProperty(AsynchAEMessage.CasReference), exception });
}
abort = true;
initialized = false;
} else {
// Check serialization supported by the service against client configuration.
// If the client is configured to use Binary serialization *but* the service
// doesnt support it, change the client serialization to xmi. Old services will
// not return in a reply the type of serialization supported which implies "xmi".
// New services *always* return "binary" or "compressedBinaryXXX"
// as a default serialization. The client
// however may still want to serialize messages using xmi though.
if (!message.propertyExists(AsynchAEMessage.SERIALIZATION)) {
// Dealing with an old service here, check if there is a mismatch with the
// client configuration. If the client is configured with binary serialization
// override this and change serialization to "xmi".
if (getSerialFormat() != SerialFormat.XMI) {
// Override configured serialization
setSerialFormat(SerialFormat.XMI);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_serialization_ovveride__WARNING", new Object[] {});
}
} else {
final int c = message.getIntProperty(AsynchAEMessage.SERIALIZATION);
if (getSerialFormat() != SerialFormat.XMI) {
// don't override if XMI - because the remote may have different type system
setSerialFormat((c == AsynchAEMessage.XMI_SERIALIZATION) ? SerialFormat.XMI :
(c == AsynchAEMessage.BINARY_SERIALIZATION) ? SerialFormat.BINARY :
SerialFormat.COMPRESSED_FILTERED);
}
}
String meta = ((TextMessage) message).getText();
ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes());
XMLInputSource in1 = new XMLInputSource(bis, null);
// Adam - store ResouceMetaData in field so we can return it from getMetaData().
resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
.parseResourceMetaData(in1);
// if remote delegate, save type system
if (!brokerURI.startsWith("vm:")) { // test if remote
setRemoteTypeSystem(AggregateAnalysisEngineController_impl.getTypeSystemImpl(resourceMetadata));
}
casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_handling_meta_reply_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), meta });
}
// check the state of the client
if ( running && asynchManager != null ) {
// Merge the metadata only if the client is still running
asynchManager.addMetadata(resourceMetadata);
}
}
} catch (Exception e) {
throw e;
} finally {
getMetaSemaphore.release();
}
}
@SuppressWarnings("unchecked")
protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand, String serializedComponentStats) {
if ( aCommand == AsynchAEMessage.Process) {
((UimaASProcessStatusImpl)aStatus).
setPerformanceMetrics(UimaSerializer.deserializePerformanceMetrics(serializedComponentStats));
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
statCL.entityProcessComplete(aCAS, aStatus);
}
}
}
protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand) {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
switch (aCommand) {
case AsynchAEMessage.GetMeta:
statCL.initializationComplete(aStatus);
break;
case AsynchAEMessage.CollectionProcessComplete:
statCL.collectionProcessComplete(aStatus);
break;
case AsynchAEMessage.Process:
case AsynchAEMessage.Ping:
statCL.entityProcessComplete(aCAS, aStatus);
break;
}
}
}
protected void cancelTimer(String identifier) {
ClientRequest request = null;
if (clientCache.containsKey(identifier)) {
request = (ClientRequest) clientCache.get(identifier);
if (request != null) {
request.cancelTimer();
}
}
}
private boolean isException(Message message) throws Exception {
int payload;
if (message.propertyExists(AsynchAEMessage.Payload)) {
payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
} else {
throw new InvalidMessageException("Message Does not Contain Payload property");
}
return (AsynchAEMessage.Exception == payload ? true : false);
}
private Exception retrieveExceptionFromMessage(Message message) throws Exception {
Exception exception = null;
try {
if (message instanceof ObjectMessage
&& ((ObjectMessage) message).getObject() instanceof Exception) {
exception = (Exception) ((ObjectMessage) message).getObject();
} else if (message instanceof TextMessage) {
exception = new UimaEEServiceException(((TextMessage) message).getText());
}
} catch( Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"retrieveExceptionFromMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
exception = new UimaEEServiceException("UIMA AS client is unable to de-serialize Exception from a remote service",e);
}
return exception;
}
private void handleProcessReplyFromSynchronousCall(ClientRequest cachedRequest, Message message)
throws Exception {
// Save reply message in the cache
cachedRequest.setMessage(message);
wakeUpSendThread(cachedRequest);
}
protected void wakeUpSendThread(ClientRequest cachedRequest) throws Exception {
if (threadMonitorMap.containsKey(cachedRequest.getThreadId())) {
ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest
.getThreadId());
// Unblock the sending thread so that it can complete processing
// of the reply. The message has been stored in the cache and
// when the thread wakes up due to notification below, it will
// retrieve the reply and process it.
if (threadMonitor != null) {
cachedRequest.setReceivedProcessCasReply();
threadMonitor.getMonitor().release();
}
}
}
/**
* Handles a ServiceInfo message returned from the Cas Multiplier. The primary purpose of this
* message is to provide the client with a dedicated queue object where the client may send
* messages to the specific CM service instance. An example of this would be a stop request that
* client needs to send to the specific Cas Multiplier.
*
* @param message
* - message received from a service
* @throws Exception
*/
protected void handleServiceInfo(Message message) throws Exception {
String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
ClientRequest casCachedRequest = null;
if ( casReferenceId != null ) {
casCachedRequest = (ClientRequest) clientCache.get(casReferenceId);
}
try {
if ( casCachedRequest != null ) {
// entering user provided callback. Handle exceptions.
UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),casCachedRequest.getCAS(),
casReferenceId);
String nodeIP = message.getStringProperty(AsynchAEMessage.ServerIP);
String pid = message.getStringProperty(AsynchAEMessage.UimaASProcessPID);
if ( casReferenceId != null && nodeIP != null && pid != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"handleServiceInfo",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_calling_onBeforeProcessCAS_FINE",
new Object[] {
casReferenceId,
String.valueOf(casCachedRequest.getCAS().hashCode())
});
}
onBeforeProcessCAS(status,nodeIP, pid);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"handleServiceInfo",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_completed_onBeforeProcessCAS_FINE",
new Object[] {
casReferenceId,
String.valueOf(casCachedRequest.getCAS().hashCode())
});
}
}
casCachedRequest.setHostIpProcessingCAS(message.getStringProperty(AsynchAEMessage.ServerIP));
if (message.getJMSReplyTo() != null && serviceDelegate.isCasPendingReply(casReferenceId)) {
casCachedRequest.setFreeCasNotificationQueue(message.getJMSReplyTo());
}
} else {
ClientRequest requestToCache = (ClientRequest) clientCache.get(uniqueIdentifier);
if ( requestToCache != null && requestToCache.isMetaRequest()) {
String nodeIP = message.getStringProperty(AsynchAEMessage.ServerIP);
String pid = message.getStringProperty(AsynchAEMessage.UimaASProcessPID);
if ( pid != null && nodeIP != null ) {
UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),null,
casReferenceId);
// notify client that the last request (GetMeta ) has been received by a service.
onBeforeProcessMeta(nodeIP, pid);
}
}
}
} catch( Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleServiceInfo", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
protected void decrementOutstandingCasCounter() {
// Received a reply, decrement number of outstanding CASes
long outstandingCasCount = outstandingCasRequests.decrementAndGet();
if (outstandingCasCount == 0) {
cpcReadySemaphore.release();
}
}
/**
* Handles response to Process CAS request. If the message originated in a service that is running
* in a separate jvm (remote), deserialize the CAS and notify the application of the completed
* analysis via application listener.
*
* @param message
* - jms message containing serialized CAS
*
* @throws Exception
*/
protected void handleProcessReply(Message message, boolean doNotify, ProcessTrace pt)
throws Exception {
if (!running) {
return;
}
int payload = -1;
String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
// Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
if (message.propertyExists(AsynchAEMessage.Payload)) {
payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
}
// Fetch entry from the client cache for a cas id returned from the service
// The client cache maintains an entry for every outstanding CAS sent to the
// service.
ClientRequest cachedRequest = null;
if (casReferenceId != null) {
cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
// Increment number of replies
if (cachedRequest != null && casReferenceId.equals(cachedRequest.getCasReferenceId())) {
// Received a reply, decrement number of outstanding CASes
decrementOutstandingCasCounter();
}
serviceDelegate.removeCasFromOutstandingList(casReferenceId);
}
if (AsynchAEMessage.Exception == payload) {
handleException(message, cachedRequest, true);
return;
}
// cachedRequest is only null if we are receiving child CASes from a
// Cas Multiplier. Otherwise, we drop the message as it is out of band
if ( cachedRequest == null && !casMultiplierDelegate ) {
// most likely a reply came in after the thread was interrupted
return;
}
// If the Cas Reference id not in the message check if the message contains an
// exception and if so, handle the exception and return.
if (casReferenceId == null) {
return;
}
if (message instanceof TextMessage
&& UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_handling_process_reply_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
message.getStringProperty(AsynchAEMessage.CasReference),
message.toString() + ((TextMessage) message).getText() });
}
if (cachedRequest != null) {
// Store the total latency for this CAS. The departure time is set right before the CAS
// is sent to a service.
cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_cas_reply_rcvd_FINE", new Object[] { casReferenceId, String.valueOf(cachedRequest.getCAS().hashCode())});
}
// If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
// sent the CAS and process the reply
if (cachedRequest.isSynchronousInvocation()) {
handleProcessReplyFromSynchronousCall(cachedRequest, message);
} else {
deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, doNotify);
}
} else if (message.propertyExists(AsynchAEMessage.InputCasReference)) {
int command = message.getIntProperty(AsynchAEMessage.Command);
if (AsynchAEMessage.ServiceInfo != command) {
handleProcessReplyFromCasMultiplier(message, casReferenceId, payload);
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
// Most likely expired message. Already handled as timeout. Discard the message and move on
// to the next
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_received_expired_msg_INFO",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
message.getStringProperty(AsynchAEMessage.CasReference) });
}
}
}
private void handleProcessReplyFromCasMultiplier(Message message, String casReferenceId,
int payload /* , ClientRequest inputCasCachedRequest */) throws Exception {
// Check if the message contains a CAS that was generated by a Cas Multiplier. If so,
// verify that the message also includes an input CAS id and that such input CAS id
// exists in the client's cache.
// Fetch the input CAS Reference Id from which the CAS being processed was generated from
String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
// Fetch the destination for Free CAS notification
Destination freeCASNotificationDestination = message.getJMSReplyTo();
if (freeCASNotificationDestination != null) {
TextMessage msg = createTextMessage();
msg.setText("");
setReleaseCASMessage(msg, casReferenceId);
// Create Message Producer for the Destination
MessageProducer msgProducer = getMessageProducer(freeCASNotificationDestination);
if (msgProducer != null) {
try {
// Send FreeCAS message to a Cas Multiplier
msgProducer.send(msg);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReplyFromCasMultiplier",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_sending_release_cas_FINEST",
new Object[] { freeCASNotificationDestination,
message.getStringProperty(AsynchAEMessage.CasReference) });
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_error_while_sending_msg__WARNING",
new Object[] { "Free Cas Temp Destination", e });
}
}
}
}
// Fetch an entry from the client cache for a given input CAS id. This would be an id
// of the CAS that the client sent out to the service.
ClientRequest inputCasCachedRequest = (ClientRequest) clientCache.get(inputCasReferenceId);
if (inputCasCachedRequest == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
// Most likely expired message. Already handled as timeout. Discard the message and move on
// to the next
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleProcessReplyFromCasMultiplier",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_received_expired_msg_INFO",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
message.getStringProperty(AsynchAEMessage.CasReference) });
}
return;
}
if (inputCasCachedRequest.isSynchronousInvocation()) {
// with synchronous invocation, child CASes are thrown away. With sync API, the UIMA-AS client
// is not using callbacks.
if ( casReferenceId.equals(inputCasCachedRequest.getCasReferenceId())) {
handleProcessReplyFromSynchronousCall(inputCasCachedRequest, message);
} else {
return;
}
}
CAS cas = null;
if (message instanceof TextMessage) {
cas = deserializeCAS(((TextMessage) message).getText(), SHADOW_CAS_POOL);
} else {
long bodyLength = ((BytesMessage) message).getBodyLength();
byte[] serializedCas = new byte[(int) bodyLength];
((BytesMessage) message).readBytes(serializedCas);
cas = deserializeCAS(serializedCas, SHADOW_CAS_POOL);
}
completeProcessingReply(cas, casReferenceId, payload, true, message, inputCasCachedRequest,
null);
}
private boolean isShutdownException(Exception exception) throws Exception {
if (exception != null) {
if (exception instanceof ServiceShutdownException || exception.getCause() != null
&& exception.getCause() instanceof ServiceShutdownException) {
return true;
}
}
return false;
}
protected void handleNonProcessException(Exception exception )
throws Exception {
ProcessTrace pt = new ProcessTrace_impl();
UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
clientSideJmxStats.incrementMetaErrorCount();
status.addEventStatus("GetMeta", "Failed", exception);
notifyListeners(null, status, AsynchAEMessage.GetMeta);
}
protected void handleException(Exception exception, String casReferenceId, String inputCasReferenceId, ClientRequest cachedRequest, boolean doNotify)
throws Exception {
handleException(exception, casReferenceId, inputCasReferenceId, cachedRequest, doNotify, true);
}
protected void handleException(Exception exception, String casReferenceId, String inputCasReferenceId, ClientRequest cachedRequest, boolean doNotify, boolean rethrow)
throws Exception {
if (!isShutdownException(exception)) {
clientSideJmxStats.incrementProcessErrorCount();
}
if (exception != null && cachedRequest != null) {
cachedRequest.setException(exception);
cachedRequest.setProcessException();
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleException",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_received_exception_msg_INFO",
new Object[] { serviceDelegate.getComponentName(),
getBrokerURI(),
casReferenceId, exception });
}
try {
if (doNotify) {
ProcessTrace pt = new ProcessTrace_impl();
// HACK! Service should only send exceptions for CASes that we sent.
// Check if this or its input parent is known.
if (inputCasReferenceId != null) {
serviceDelegate.removeCasFromOutstandingList(inputCasReferenceId);
} else if (casReferenceId != null ) {
serviceDelegate.removeCasFromOutstandingList(casReferenceId);
}
UimaASProcessStatusImpl status;
if (cachedRequest != null ) {
// Add Cas referenceId(s) to enable matching replies with requests (ids may be null)
status = new UimaASProcessStatusImpl(pt, cachedRequest.getCAS(), casReferenceId,
inputCasReferenceId);
} else {
status = new UimaASProcessStatusImpl(pt, null, casReferenceId,
inputCasReferenceId);
}
status.addEventStatus("Process", "Failed", exception);
if (cachedRequest != null && !cachedRequest.isSynchronousInvocation()
&& cachedRequest.getCAS() != null) {
notifyListeners(cachedRequest.getCAS(), status, AsynchAEMessage.Process);
} else {
notifyListeners(null, status, AsynchAEMessage.Process);
}
// Done here
return;
} else {
if ( rethrow ) {
throw new ResourceProcessException(exception);
}
}
} catch (Exception e) {
throw e;
} finally {
if (cachedRequest != null) {
if (cachedRequest.isSynchronousInvocation() && cachedRequest.isProcessException()) {
// Wake up the send thread that is blocking waiting for a reply. When the thread
// receives the signal, it checks if the reply contains an exception and will
// not return control back to the client
wakeUpSendThread(cachedRequest);
}
// Dont release the CAS if the application uses synchronous API
if (!cachedRequest.isSynchronousInvocation() && cachedRequest.getCAS() != null) {
cachedRequest.getCAS().release();
}
}
removeFromCache(casReferenceId);
serviceDelegate.removeCasFromOutstandingList(casReferenceId);
decrementOutstandingCasCounter();
}
}
protected void handleException(Message message, ClientRequest cachedRequest, boolean doNotify)
throws Exception {
Exception exception = retrieveExceptionFromMessage(message);
String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
handleException(exception, casReferenceId, inputCasReferenceId, cachedRequest, doNotify);
}
private void completeProcessingReply(CAS cas, String casReferenceId, int payload,
boolean doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt)
throws Exception {
if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.BinaryPayload == payload
|| AsynchAEMessage.CASRefID == payload) {
if (pt == null) {
pt = new ProcessTrace_impl();
}
try {
// Log stats and populate ProcessTrace object
logTimingInfo(message, pt, cachedRequest);
if (doNotify) {
UimaASProcessStatusImpl status;
String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
if (inputCasReferenceId != null
&& inputCasReferenceId.equals(cachedRequest.getCasReferenceId())) {
status = new UimaASProcessStatusImpl(pt, cas,casReferenceId, inputCasReferenceId);
} else {
status = new UimaASProcessStatusImpl(pt, cas, casReferenceId);
}
if ( message.propertyExists(AsynchAEMessage.CASPerComponentMetrics)) {
// Add CAS identifier to enable matching replies with requests
notifyListeners(cas, status, AsynchAEMessage.Process, message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics));
} else {
// Add CAS identifier to enable matching replies with requests
notifyListeners(cas, status, AsynchAEMessage.Process);
}
} else { // synchronous sendAndReceive() was used
if (casReferenceId != null && message.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) {
cachedRequest.getComponentMetricsList().
addAll(UimaSerializer.deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
}
}
}
} finally {
// Dont release the CAS if the application uses synchronous API
if (remoteService && !cachedRequest.isSynchronousInvocation()) {
if (cas != null) {
cas.release();
}
}
removeFromCache(casReferenceId);
}
}
}
private void logTimingInfo(Message message, ProcessTrace pt, ClientRequest cachedRequest)
throws Exception {
clientSideJmxStats.incrementTotalNumberOfCasesProcessed();
if (message.getStringProperty(AsynchAEMessage.CasReference) != null) {
String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
if (clientCache.containsKey(casReferenceId)) {
ClientRequest cacheEntry = (ClientRequest) clientCache.get(casReferenceId);
if (cacheEntry == null) {
return;
}
// Add time waiting for reply to the client JMX stats
long timeWaitingForReply = cacheEntry.getTimeWaitingForReply();
clientSideJmxStats.incrementTotalTimeWaitingForReply(timeWaitingForReply);
// Add CAS response latency time to the client JMX stats
long responseLatencyTime = cacheEntry.getSerializationTime() + timeWaitingForReply
+ cacheEntry.getDeserializationTime();
clientSideJmxStats.incrementTotalResponseLatencyTime(responseLatencyTime);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME)
.logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
"Total Time Waiting For Reply",
(float) timeWaitingForReply / (float) 1000000 });
}
pt.addEvent("UimaEE", "process", "Total Time Waiting For Reply",
(int) (timeWaitingForReply / 1000000), "");
}
}
if (message.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
long timeToSerializeCAS = message.getLongProperty(AsynchAEMessage.TimeToSerializeCAS);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
"Time To Serialize Cas", (float) timeToSerializeCAS / (float) 1000000 });
}
pt.addEvent("UimaEE", "process", "Time To Serialize Cas",
(int) (timeToSerializeCAS / 1000000), "");
// Add the client serialization overhead to the value returned from a service
timeToSerializeCAS += cachedRequest.getSerializationTime();
clientSideJmxStats.incrementTotalSerializationTime(timeToSerializeCAS);
}
if (message.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
long timeToDeserializeCAS = message.getLongProperty(AsynchAEMessage.TimeToDeserializeCAS);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
"Time To Deserialize Cas", (float) timeToDeserializeCAS / (float) 1000000 });
}
pt.addEvent("UimaEE", "process", "Time To Deserialize Cas",
(int) (timeToDeserializeCAS / 1000000), "");
// Add the client deserialization overhead to the value returned from a service
timeToDeserializeCAS += cachedRequest.getDeserializationTime();
clientSideJmxStats.incrementTotalDeserializationTime(timeToDeserializeCAS);
}
if (message.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
long timeWaitingForCAS = message.getLongProperty(AsynchAEMessage.TimeWaitingForCAS);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
"Time to Wait for CAS", (float) timeWaitingForCAS / (float) 1000000 });
}
pt.addEvent("UimaEE", "process", "Time to Wait for CAS", (int) (timeWaitingForCAS / 1000000),
"");
}
if (message.propertyExists(AsynchAEMessage.TimeInService)) {
long ttimeInService = message.getLongProperty(AsynchAEMessage.TimeInService);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
"Time In Service", (float) ttimeInService / (float) 1000000 });
}
pt.addEvent("UimaEE", "process", "Time In Service", (int) (ttimeInService / 1000000), "");
}
if (message.propertyExists(AsynchAEMessage.TotalTimeSpentInAnalytic)) {
long totaltimeInService = message.getLongProperty(AsynchAEMessage.TotalTimeSpentInAnalytic);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
"Total Time In Service", (float) totaltimeInService / (float) 1000000 });
}
pt.addEvent("UimaEE", "process", "Total Time In Service",
(int) (totaltimeInService / 1000000), "");
}
if (message.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
long totaltimeInProcessCAS = message.getLongProperty(AsynchAEMessage.TimeInProcessCAS);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
"Total Time In Process CAS", (float) totaltimeInProcessCAS / (float) 1000000 });
}
float timeInMillis = (float) totaltimeInProcessCAS / (float) 1000000;
pt.addEvent("UimaEE", "process", "Total Time In Process CAS", (int) timeInMillis, "");
clientSideJmxStats.incrementTotalTimeToProcess(totaltimeInProcessCAS);
}
if (message.propertyExists(AsynchAEMessage.IdleTime)) {
long totalIdletime = message.getLongProperty(AsynchAEMessage.IdleTime);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_timer_detail_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
"Idle Time Waiting For CAS", (float) totalIdletime / (float) 1000000 });
}
pt.addEvent("UimaEE", "process", "Idle Time Waiting For CAS",
(int) (totalIdletime / 1000000), "");
clientSideJmxStats.incrementTotalIdleTime(totalIdletime);
}
if (message.propertyExists(AsynchAEMessage.ServerIP)) {
pt.addEvent("UimaEE", "process", "Service IP", 0, message
.getStringProperty(AsynchAEMessage.ServerIP));
}
}
protected void removeFromCache(String aCasReferenceId) {
if (aCasReferenceId != null && clientCache.containsKey(aCasReferenceId)) {
ClientRequest requestToCache = (ClientRequest) clientCache.get(aCasReferenceId);
if (requestToCache != null) {
requestToCache.removeEntry(aCasReferenceId);
}
clientCache.remove(aCasReferenceId);
}
}
protected CAS deserialize(String aSerializedCAS, CAS aCAS) throws Exception {
XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
uimaSerializer.deserializeCasFromXmi(aSerializedCAS, aCAS, deserSharedData, true, -1);
return aCAS;
}
protected CAS deserialize(String aSerializedCAS, CAS aCAS,
XmiSerializationSharedData deserSharedData, boolean deltaCas) throws Exception {
if (deltaCas) {
uimaSerializer.deserializeCasFromXmi(aSerializedCAS, aCAS, deserSharedData, true,
deserSharedData.getMaxXmiId(), AllowPreexistingFS.allow);
} else {
uimaSerializer.deserializeCasFromXmi(aSerializedCAS, aCAS, deserSharedData, true, -1);
}
return aCAS;
}
protected CAS deserialize(byte[] binaryData, ClientRequest cachedRequest) throws Exception {
CAS cas = cachedRequest.getCAS();
uimaSerializer.deserializeCasFromBinary(binaryData, cas);
return cas;
}
protected CAS deserializeCAS(String aSerializedCAS, ClientRequest cachedRequest) throws Exception {
CAS cas = cachedRequest.getCAS();
return deserialize(aSerializedCAS, cas);
}
/**
* handle both ordinary binary and compressed6 binary
* @param aSerializedCAS
* @param cachedRequest
* @return
* @throws Exception
*/
protected CAS deserializeCAS(byte[] aSerializedCAS, ClientRequest cachedRequest) throws Exception {
CAS cas = cachedRequest.getCAS();
ReuseInfo reuseInfo = cachedRequest.getCompress6ReuseInfo();
ByteArrayInputStream bais = new ByteArrayInputStream(aSerializedCAS);
if (reuseInfo != null) {
Serialization.deserializeCAS(cas, bais, null, reuseInfo);
} else {
((CASImpl)cas).reinit(bais);
}
// uimaSerializer.deserializeCasFromBinary(aSerializedCAS, cas);
return cas;
}
// never called 5/2013 ??
protected CAS deserializeCAS(byte[] aSerializedCAS, CAS aCas) throws Exception {
uimaSerializer.deserializeCasFromBinary(aSerializedCAS, aCas);
return aCas;
}
protected CAS deserializeCAS(String aSerializedCAS, ClientRequest cachedRequest, boolean deltaCas)
throws Exception {
CAS cas = cachedRequest.getCAS();
return deserialize(aSerializedCAS, cas, cachedRequest.getXmiSerializationSharedData(), deltaCas);
}
protected CAS deserializeCAS(String aSerializedCAS, String aCasPoolName) throws Exception {
CAS cas = asynchManager.getNewCas(aCasPoolName);
return deserialize(aSerializedCAS, cas);
}
protected CAS deserializeCAS(byte[] aSerializedCAS, String aCasPoolName) throws Exception {
CAS cas = asynchManager.getNewCas(aCasPoolName);
uimaSerializer.deserializeCasFromBinary(aSerializedCAS, cas);
return cas;
}
/**
* Listener method receiving JMS Messages from the response queue.
*
*/
public void onMessage(final Message message) {
// Process message in a separate thread. Previously the message was processed in ActiveMQ dispatch thread.
// This onMessage() method is called by ActiveMQ code from a critical region protected with a lock. The lock
// is only released if this method returns. Running in a dispatch thread caused a hang when an application
// decided to call System.exit() in any of its callback listener methods. The UIMA AS client adds a
// ShutdownHoook to the JVM to enable orderly shutdown which includes stopping JMS Consumer, JMS Producer
// and finally stopping JMS Connection. The ShutdownHook support was added to the client in case the
// application doesnt call client's stop() method. Now, the hang was caused by the fact that the dispatch
// thread was used to call System.exit() which in turn executed client's ShutdownHook code. The ShutdownHook
// code runs in a separate thread, but the the JVM blocks the dispatch thread until the ShutdownHook
// finishes. It never will though, since the ShutdownHook is calling ActiveMQSession.close() which tries to enter
// the same critical region that the dispatch thread is still stuck into. DEADLOCK.
// The code below uses a simple FixedThreadPool Executor with a single thread. This thread is reused instead
// creating one on the fly.
exec.execute( new Runnable() {
public void run() {
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_msg_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
}
if (!message.propertyExists(AsynchAEMessage.Command)) {
return;
}
int command = message.getIntProperty(AsynchAEMessage.Command);
if (AsynchAEMessage.CollectionProcessComplete == command) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_cpc_reply_FINE",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
}
handleCollectionProcessCompleteReply(message);
} else if (AsynchAEMessage.GetMeta == command) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
}
handleMetadataReply(message);
} else if (AsynchAEMessage.Process == command) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINE",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
}
handleProcessReply(message, true, null);
} else if (AsynchAEMessage.ServiceInfo == command) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_received_service_info_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
}
handleServiceInfo(message);
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
}
});
}
/**
* Gets the ProcessingResourceMetadata for the asynchronous AnalysisEngine.
*/
public ProcessingResourceMetaData getMetaData() throws ResourceInitializationException {
return resourceMetadata;
}
public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException {
return sendAndReceiveCAS(aCAS, null, null);
}
public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException {
return sendAndReceiveCAS(aCAS, pt, null);
}
public String sendAndReceiveCAS(CAS aCAS, List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws ResourceProcessException {
return sendAndReceiveCAS(aCAS, null, componentMetricsList);
}
/**
* This is a synchronous method which sends a message to a destination and blocks waiting for a
* reply.
*/
public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt, List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws ResourceProcessException {
if (!running) {
throw new ResourceProcessException(new Exception("Uima EE Client Not In Running State"));
}
if (!serviceDelegate.isSynchronousAPI()) {
// Change the flag to indicate synchronous invocation.
// This info will be needed to handle Ping replies.
// Different code is used for handling PING replies for
// sync and async API.
serviceDelegate.setSynchronousAPI();
}
String casReferenceId = null;
// keep handle to CAS, we'll deserialize into this same CAS later
sendAndReceiveCAS = aCAS;
ThreadMonitor threadMonitor = null;
if (threadMonitorMap.containsKey(Thread.currentThread().getId())) {
threadMonitor = (ThreadMonitor) threadMonitorMap.get(Thread.currentThread().getId());
} else {
threadMonitor = new ThreadMonitor(Thread.currentThread().getId());
threadMonitorMap.put(Thread.currentThread().getId(), threadMonitor);
}
ClientRequest cachedRequest = produceNewClientRequestObject();
cachedRequest.setSynchronousInvocation();
// save application provided List where the performance stats will be copied
// when reply comes back
cachedRequest.setComponentMetricsList(componentMetricsList);
// This is synchronous call, acquire and hold the semaphore before
// dispatching a CAS to a service. The semaphore will be released
// iff:
// a) reply is received (success or failure with exception)
// b) timeout occurs
// c) client is stopped
// Once the semaphore is acquired and the CAS is dispatched
// the thread will block in trying to acquire the semaphore again
// below.
if (threadMonitor != null && threadMonitor.getMonitor() != null) {
try {
threadMonitor.getMonitor().acquire();
} catch (InterruptedException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_interrupted_INFO", new Object[] { casReferenceId, String.valueOf(aCAS.hashCode())});
}
// cancel the timer if it is associated with a CAS this thread is waiting for. This would be
// the oldest CAS submitted to a queue for processing. The timer will be canceled and restarted
// for the second oldest CAS in the outstanding list.
serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
throw new ResourceProcessException(e);
}
}
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_cas_submitting_FINE", new Object[] { casReferenceId, String.valueOf(aCAS.hashCode()), Thread.currentThread().getId()});
}
// send CAS. This call does not block. Instead we will block the sending thread below.
casReferenceId = sendCAS(aCAS, cachedRequest);
} catch( ResourceProcessException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { e });
}
threadMonitor.getMonitor().release();
removeFromCache(casReferenceId);
throw e;
}
if (threadMonitor != null && threadMonitor.getMonitor() != null) {
while (running) {
try {
// Block sending thread until a reply is received. The thread
// will be signaled either when a reply to the request just
// sent is received OR a Ping reply was received. The latter
// is necessary to allow handling of CASes delayed due to
// a timeout. A previous request timed out and the service
// state was changed to TIMEDOUT. While the service is in this
// state all sending threads add outstanding CASes to the list
// of CASes pending dispatch and each waits until the state
// of the service changes to OK. The state is changed to OK
// when the client receives a reply to a PING request. When
// the Ping reply comes, the client will signal this thread.
// The thread checks the list of CASes pending dispatch trying
// to find an entry that matches ID of the CAS previously
// delayed. If the CAS is found in the delayed list, it will
// be removed from the list and send to the service for
// processing. The 'wasSignaled' flag is only set when the
// CAS reply is received. Ping reply logic does not change
// this flag.
threadMonitor.getMonitor().acquire();
// Send thread was awoken by either process reply or ping reply
// If the service is in the ok state and the CAS is in the
// list of CASes pending dispatch, remove the CAS from the list
// and send it to the service.
if (cachedRequest.isTimeoutException() || cachedRequest.isProcessException()) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_process_exception_handler5__WARNING", new Object[] { String.valueOf(aCAS.hashCode()), Thread.currentThread().getId()});
}
// Handled outside of the while-loop below
break;
}
if (running && serviceDelegate.getState() == Delegate.OK_STATE
&& serviceDelegate.removeCasFromPendingDispatchList(casReferenceId)) {
sendCAS(aCAS, cachedRequest);
} else {
break; // done here, received a reply or the client is not running
}
} catch (InterruptedException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_interrupted_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())});
}
// try to remove from pending dispatch list. If not there, remove from pending reply list
if ( !serviceDelegate.removeCasFromPendingDispatchList(casReferenceId)) {
serviceDelegate.removeCasFromOutstandingList(casReferenceId);
}
// cancel the timer if it is associated with a CAS this thread is waiting for. This would be
// the oldest CAS submitted to a queue for processing. The timer will be canceled and restarted
// for the second oldest CAS in the outstanding list.
serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_canceled_timer_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())});
}
removeFromCache(casReferenceId);
throw new ResourceProcessException(e);
} finally {
threadMonitor.getMonitor().release();
}
}
} // if
if (abort) {
throw new ResourceProcessException(new RuntimeException("Uima AS Client API Stopping"));
}
// check if timeout exception
if (cachedRequest.isTimeoutException()) {
String qName="";
try {
qName = getEndPointName();
} catch( Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
// Request To Process Cas Has Timed-out.
throw new ResourceProcessException(JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "" +
"UIMAJMS_process_timeout_WARNING",
new Object[]{qName, getBrokerURI(), cachedRequest.getHostIpProcessingCAS()},
new UimaASProcessCasTimeout("UIMA AS Client Timed Out Waiting for Reply From Service:"+qName+" Broker:"+getBrokerURI()));
}
// If a reply contains process exception, throw an exception and let the
// listener decide what happens next
if (cachedRequest.isProcessException()) {
String qName="";
try {
qName = getEndPointName();
} catch( Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
throw new ResourceProcessException(
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "" +
"UIMAJMS_received_exception_msg_INFO",
new Object[]{qName, getBrokerURI(), casReferenceId},
cachedRequest.getException());
}
try {
// Process reply in the send thread
Message message = cachedRequest.getMessage();
if (message != null) {
deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
}
} catch (ResourceProcessException rpe) {
throw rpe;
} catch (Exception e) {
throw new ResourceProcessException(e);
}
return casReferenceId;
}
private void deserializeAndCompleteProcessingReply(String casReferenceId, Message message,
ClientRequest cachedRequest, ProcessTrace pt, boolean doNotify) throws Exception {
if (!running) {
return;
}
int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
if (message.propertyExists(AsynchAEMessage.CasSequence)) {
handleProcessReplyFromCasMultiplier(message, casReferenceId, payload);// , cachedRequest);
} else {
long t1 = System.nanoTime();
boolean deltaCas = false;
if (message.propertyExists(AsynchAEMessage.SentDeltaCas)) {
deltaCas = message.getBooleanProperty(AsynchAEMessage.SentDeltaCas);
}
CAS cas = null;
if (message instanceof TextMessage) {
cas = deserializeCAS(((TextMessage) message).getText(), cachedRequest, deltaCas);
} else {
long bodyLength = ((BytesMessage) message).getBodyLength();
byte[] serializedCas = new byte[(int) bodyLength];
((BytesMessage) message).readBytes(serializedCas);
cas = deserializeCAS(serializedCas, cachedRequest);
}
cachedRequest.setDeserializationTime(System.nanoTime() - t1);
completeProcessingReply(cas, casReferenceId, payload, doNotify, message, cachedRequest, pt);
}
}
protected void notifyOnTimout(CAS aCAS, String anEndpoint, int aTimeoutKind, String casReferenceId) {
ProcessTrace pt = new ProcessTrace_impl();
UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt, aCAS, casReferenceId);
switch (aTimeoutKind) {
case (MetadataTimeout):
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_meta_timeout_WARNING", new Object[] { anEndpoint });
}
status.addEventStatus("GetMeta", "Failed", new UimaASMetaRequestTimeout("UIMA AS Client Timed Out Waiting For GetMeta Reply From a Service On Queue:"+anEndpoint));
notifyListeners(null, status, AsynchAEMessage.GetMeta);
abort = true;
getMetaSemaphore.release();
break;
case (PingTimeout):
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_meta_timeout_WARNING", new Object[] { anEndpoint });
}
status.addEventStatus("Ping", "Failed", new UimaASPingTimeout("UIMA AS Client Timed Out Waiting For Ping Reply From a Service On Queue:"+anEndpoint));
notifyListeners(null, status, AsynchAEMessage.Ping);
// The main thread could be stuck waiting for a CAS. Grab any CAS in the
// client cache and release it so that we can shutdown.
if (!clientCache.isEmpty()) {
ClientRequest anyCasRequest = clientCache.elements().nextElement();
if (anyCasRequest.getCAS() != null) {
anyCasRequest.getCAS().release();
}
}
abort = true;
break;
case (CpCTimeout):
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_cpc_timeout_INFO", new Object[] { anEndpoint });
}
status.addEventStatus("CpC", "Failed",
new UimaASCollectionProcessCompleteTimeout("UIMA AS Client Timed Out Waiting For CPC Reply From a Service On Queue:"+anEndpoint));
// release the semaphore acquired in collectionProcessingComplete()
cpcReplySemaphore.release();
notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
break;
case (ProcessTimeout):
if ( casReferenceId != null ) {
ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
if (cachedRequest != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_process_timeout_WARNING", new Object[] { anEndpoint, getBrokerURI(), cachedRequest.getHostIpProcessingCAS() });
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
// if missing for any reason ...
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_received_expired_msg_INFO",
new Object[] { anEndpoint, casReferenceId });
}
return;
}
// Store the total latency for this CAS. The departure time is set right before the CAS
// is sent to a service.
cachedRequest.setTimeWaitingForReply(System.nanoTime()
- cachedRequest.getCASDepartureTime());
// mark timeout exception
cachedRequest.setTimeoutException();
if (cachedRequest.isSynchronousInvocation()) {
// Signal a thread that we received a reply, if in the map
if (threadMonitorMap.containsKey(cachedRequest.getThreadId())) {
ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest
.getThreadId());
// Unblock the sending thread so that it can complete processing with an error
if (threadMonitor != null) {
threadMonitor.getMonitor().release();
cachedRequest.setReceivedProcessCasReply(); // should not be needed
}
}
} else {
// notify the application listener with the error
if ( serviceDelegate.isPingTimeout()) {
exc = new UimaASProcessCasTimeout(new UimaASPingTimeout("UIMA AS Client Ping Time While Waiting For Reply From a Service On Queue:"+anEndpoint));
serviceDelegate.resetPingTimeout();
} else {
exc = new UimaASProcessCasTimeout("UIMA AS Client Timed Out Waiting For CAS:"+casReferenceId+ " Reply From a Service On Queue:"+anEndpoint);
}
status.addEventStatus("Process", "Failed", exc);
notifyListeners(aCAS, status, AsynchAEMessage.Process);
}
boolean isSynchronousCall = cachedRequest.isSynchronousInvocation();
cachedRequest.removeEntry(casReferenceId);
serviceDelegate.removeCasFromOutstandingList(casReferenceId);
// Check if all replies have been received
long outstandingCasCount = outstandingCasRequests.decrementAndGet();
if (outstandingCasCount == 0) {
cpcReadySemaphore.release();
}
//
if ( !isSynchronousCall && serviceDelegate.getCasPendingReplyListSize() > 0) {
String nextOutstandingCasReferenceId =
serviceDelegate.getOldestCasIdFromOutstandingList();
if ( nextOutstandingCasReferenceId != null ) {
cachedRequest = (ClientRequest) clientCache.get(nextOutstandingCasReferenceId);
if ( cachedRequest != null && cachedRequest.getCAS() != null ) {
try {
sendCAS(cachedRequest.getCAS());
} catch( Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"notifyOnTimout", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
}
}
}
break;
} // case
}
/**
* @override
*/
protected MessageProducer getMessageProducer(Destination destination) throws Exception {
return null;
}
/**
* has some cache values for CAS, similar to class CacheEntry
*
*/
public class ClientRequest {
private Timer timer = null;
private long processTimeout = 0L;
private long metadataTimeout = 0L;
private long cpcTimeout = 0L;
private String casReferenceId = null;
private BaseUIMAAsynchronousEngineCommon_impl uimaEEEngine = null;
private volatile boolean isSerializedCAS;
private String serializedCAS;
private CAS cas;
private volatile boolean isMetaRequest = false;
private volatile boolean isCPCRequest = false;
private volatile boolean isRemote = true;
private String endpoint;
private long threadId = Thread.currentThread().getId();
private Message message;
private volatile boolean synchronousInvocation;
private volatile boolean timeoutException;
private long casDepartureTime;
private long timeWaitingForReply;
private long serializationTime;
private long deserializationTime;
private long metaTimeoutErrorCount;
private long processTimeoutErrorCount;
private long processErrorCount;
private XmiSerializationSharedData sharedData;
private ReuseInfo compress6ReuseInfo;
private byte[] binaryCas = null;
private volatile boolean isBinaryCas = false;
private Exception exception;
private volatile boolean processException;
private Destination freeCasNotificationQueue = null;
private String hostIpProcessingCAS;
List<AnalysisEnginePerformanceMetrics> componentMetricsList;
public List<AnalysisEnginePerformanceMetrics> getComponentMetricsList() {
return componentMetricsList;
}
public void setComponentMetricsList(
List<AnalysisEnginePerformanceMetrics> componentMetricsList) {
this.componentMetricsList = componentMetricsList;
}
public String getHostIpProcessingCAS() {
return hostIpProcessingCAS;
}
public void setHostIpProcessingCAS(String hostIpProcessingCAS) {
this.hostIpProcessingCAS = hostIpProcessingCAS;
}
public Destination getFreeCasNotificationQueue() {
return freeCasNotificationQueue;
}
public void setFreeCasNotificationQueue(Destination freeCasNotificationQueue) {
this.freeCasNotificationQueue = freeCasNotificationQueue;
}
public boolean isProcessException() {
return processException;
}
public void setProcessException() {
this.processException = true;
}
public Exception getException() {
return exception;
}
public void setException(Exception exception) {
this.exception = exception;
}
public long getMetaTimeoutErrorCount() {
return metaTimeoutErrorCount;
}
public void setMetaTimeoutErrorCount(long timeoutErrorCount) {
metaTimeoutErrorCount = timeoutErrorCount;
}
public long getProcessTimeoutErrorCount() {
return processTimeoutErrorCount;
}
public void setProcessTimeoutErrorCount(long timeoutErrorCount) {
processTimeoutErrorCount = timeoutErrorCount;
}
public long getProcessErrorCount() {
return processErrorCount;
}
public void setProcessErrorCount(long processErrorCount) {
this.processErrorCount = processErrorCount;
}
public long getSerializationTime() {
return serializationTime;
}
public void setSerializationTime(long serializationTime) {
this.serializationTime = serializationTime;
}
public long getDeserializationTime() {
return deserializationTime;
}
public void setDeserializationTime(long deserializationTime) {
this.deserializationTime = deserializationTime;
}
public boolean isSynchronousInvocation() {
return synchronousInvocation;
}
public void setSynchronousInvocation() {
synchronousInvocation = true;
}
public boolean isTimeoutException() {
return timeoutException;
}
public void setTimeoutException() {
timeoutException = true;
}
public void clearTimeoutException() {
timeoutException = false;
}
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
this.message = message;
}
public ClientRequest(String aCasReferenceId, BaseUIMAAsynchronousEngineCommon_impl aUimaEEEngine) // ,
// long
// aTimeout)
{
uimaEEEngine = aUimaEEEngine;
casReferenceId = aCasReferenceId;
sharedData = null;
}
public String getCasReferenceId() {
return casReferenceId;
}
public void setThreadId(long aThreadId) {
threadId = aThreadId;
}
public long getThreadId() {
return threadId;
}
public void setReceivedProcessCasReply() {
}
public void setMetadataTimeout(int aTimeout) {
metadataTimeout = aTimeout;
}
public void setProcessTimeout(int aTimeout) {
processTimeout = aTimeout;
}
public long getProcessTimeout() {
return processTimeout;
}
public void setCpcTimeout(int aTimeout) {
cpcTimeout = aTimeout;
}
public void setEndpoint(String anEndpoint) {
endpoint = anEndpoint;
}
public void setIsRemote(boolean aRemote) {
isRemote = aRemote;
}
public boolean isRemote() {
return isRemote;
}
public void setCAS(CAS aCAS) {
cas = aCAS;
}
public CAS getCAS() {
return cas;
}
public void setCAS(String aSerializedCAS) {
serializedCAS = aSerializedCAS;
isSerializedCAS = true;
}
public void setBinaryCAS(byte[] aBinaryCas) {
binaryCas = aBinaryCas;
isBinaryCas = true;
}
public boolean isBinaryCAS() {
return isBinaryCas;
}
public byte[] getBinaryCAS() {
return binaryCas;
}
public String getXmiCAS() {
return serializedCAS;
}
public void startTimer() {
Date timeToRun = null;
final ClientRequest _clientReqRef = this;
if (isMetaRequest()) {
timeToRun = new Date(System.currentTimeMillis() + metadataTimeout);
} else if (isCPCRequest()) {
timeToRun = new Date(System.currentTimeMillis() + cpcTimeout);
} else {
timeToRun = new Date(System.currentTimeMillis() + processTimeout);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "startTimer",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_starting_timer_FINEST",
new Object[] { endpoint });
}
timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "run",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_timer_expired_INFO",
new Object[] { endpoint, casReferenceId });
}
CAS cas = null;
if (isSerializedCAS) {
try {
if (isRemote) {
if (isBinaryCas) {
cas = deserialize(binaryCas, _clientReqRef);
} else {
cas = deserializeCAS(serializedCAS, _clientReqRef);
}
} else {
cas = null; // not supported for collocated
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"startTimer.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
}
int timeOutKind;
if (isMetaRequest()) {
timeOutKind = MetadataTimeout;
initialized = false;
abort = true;
metaTimeoutErrorCount++;
clientSideJmxStats.incrementMetaTimeoutErrorCount();
getMetaSemaphore.release();
} else if (isCPCRequest()) {
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"startTimer.run()", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_timedout_waiting_for_CPC__WARNING", getEndPointName());
}
} catch (Exception e) {
}
timeOutKind = CpCTimeout;
cpcReadySemaphore.release();
} else {
timeOutKind = ProcessTimeout;
processTimeoutErrorCount++;
clientSideJmxStats.incrementProcessTimeoutErrorCount();
}
uimaEEEngine.notifyOnTimout(cas, endpoint, timeOutKind, getCasReferenceId());
timer.cancel();
if (cas != null) {
cas.release();
}
return;
}
}, timeToRun);
}
public void removeEntry(String aCasReferenceId) {
if (uimaEEEngine.clientCache.containsKey(casReferenceId)) {
uimaEEEngine.clientCache.remove(casReferenceId);
}
}
public void cancelTimer() {
if (timer != null) {
timer.cancel();
}
}
public boolean isCPCRequest() {
return isCPCRequest;
}
public void setCPCRequest(boolean isCPCRequest) {
this.isCPCRequest = isCPCRequest;
}
public boolean isMetaRequest() {
return isMetaRequest;
}
public void setMetaRequest(boolean isMetaRequest) {
this.isMetaRequest = isMetaRequest;
}
public void setCASDepartureTime(long aDepartureTime) {
casDepartureTime = aDepartureTime;
}
public long getCASDepartureTime() {
return casDepartureTime;
}
public void setTimeWaitingForReply(long aTimeWaitingForReply) {
timeWaitingForReply = aTimeWaitingForReply;
}
public long getTimeWaitingForReply() {
return timeWaitingForReply;
}
public XmiSerializationSharedData getXmiSerializationSharedData() {
return sharedData;
}
public void setXmiSerializationSharedData(XmiSerializationSharedData data) {
this.sharedData = data;
}
public ReuseInfo getCompress6ReuseInfo() {
return compress6ReuseInfo;
}
public void setCompress6ReuseInfo(ReuseInfo compress6ReuseInfo) {
this.compress6ReuseInfo = compress6ReuseInfo;
}
}
protected static class ThreadMonitor {
private long threadId;
private Semaphore monitor = new Semaphore(1);
public ThreadMonitor(long aThreadId) {
threadId = aThreadId;
}
public long getThreadId() {
return threadId;
}
public Semaphore getMonitor() {
return monitor;
}
}
/**
* Called when the producer thread is fully initialized
*/
protected void onProducerInitialized() {
producerInitialized = true;
}
public boolean connectionOpen() {
SharedConnection sharedConnection;
if ( (sharedConnection = lookupConnection(getBrokerURI())) != null ) {
return sharedConnection.isConnectionValid();
}
return false;
}
/**
* Continuously tries to recover connection a broker. it gives up
* when the client is stopped or the connection is recovered.
*/
public boolean recoverSharedConnectionIfClosed() {
SharedConnection sharedConnection;
if ( !connectionOpen() ) {
sharedConnection = lookupConnection(getBrokerURI());
while ( running ) {
// blocks until connection is refreshed
try {
sharedConnection.retryConnectionUntilSuccessfull();
break;
} catch( Exception e) {
// will retry until successful or the client is not running
}
}
// if still running inject new connection
if ( running ) {
// Inject a new Connection object into an object that sends
// messages to a service. This call invalidates all Session
// and Producer objects.
getDispatcher().setConnection(sharedConnection.getConnection());
}
return true;
}
return false;
}
public void onException(Exception aFailure, String aDestination) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "onException",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_error_while_sending_msg__WARNING",
new Object[] { aDestination, aFailure });
}
try {
stop();
} catch( Exception e) {
e.printStackTrace();
}
}
/**
* @override
*/
protected void setReleaseCASMessage(TextMessage msg, String aCasReferenceId) throws Exception {
}
protected SharedConnection lookupConnection(String brokerUrl) {
if ( brokerUrl != null ) {
if ( sharedConnections.containsKey(brokerUrl) ) {
return sharedConnections.get(brokerUrl);
}
}
return null;
}
// This class is used to share JMS Connection by all instances of UIMA AS
// client deployed in the same JVM.
public static class SharedConnection {
private static final Class CLASS_NAME = SharedConnection.class;
public enum ConnectionState { CLOSED, FAILED, WAITING_FOR_BROKER, OPEN };
private volatile Connection connection;
private volatile boolean stop = false;
private ConnectionState state = ConnectionState.CLOSED;
private Object stateMonitor = new Object();
private Object mux = new Object();
private String brokerURL;
private ConnectionValidator connectionValidator;
private Object destroyMux = new Object();
private ConnectionFactory connectionFactory = null;
private List<BaseUIMAAsynchronousEngineCommon_impl> clientList =
new ArrayList<BaseUIMAAsynchronousEngineCommon_impl>();
public SharedConnection( ConnectionFactory connectionFactory , String brokerURL ) {
this.connectionFactory = connectionFactory;
this.brokerURL = brokerURL;
}
public String getBroker() {
return brokerURL;
}
public void setConnectionValidator( ConnectionValidator validator ) {
connectionValidator = validator;
}
public boolean isOpen() {
return state == ConnectionState.OPEN;
}
public boolean isConnectionValid() {
if ( connectionValidator == null ) {
return false;
}
if ( connectionValidator.connectionClosedOrInvalid(connection) == false ) {
return true;
}
return false;
}
/**
* Using jndi context look the connection factory and
* attempt to create broker connection. Throws exception
* if not successfull.
*/
public void create() throws Exception {
if ( connectionFactory == null ) {
throw new InstantiationException("UIMA AS Client Unable to Initialize SharedConnection Object. ConnectionFactory Has Not Been Provided");
}
// Create shared jms connection to a broker
connection = connectionFactory.createConnection();
state = ConnectionState.OPEN;
}
private void reinitializeClientListeners() {
for( BaseUIMAAsynchronousEngineCommon_impl client : clientList ) {
try {
client.initializeConsumer(brokerURL, connection);
} catch( Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"reinitializeClientListeners", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
}
}
private void forceTimeout(List<DelegateEntry> casList, BaseUIMAAsynchronousEngineCommon_impl client) throws Exception {
// Force timeout on all pending CASes. Replies will never come, we've lost broker
// connection.
Exception forcedTimeoutException = new MessageTimeoutException("Client Lost Connection To Broker. Forcing Timeout Exception");
ArrayList<DelegateEntry> copyOfPendingCasList = new ArrayList<DelegateEntry>(casList);
for( DelegateEntry entry : copyOfPendingCasList ) {
try {
ClientRequest cachedRequest = (ClientRequest) client.clientCache.get(entry.getCasReferenceId());
// Handle forced timeout. This method removes CAS from the list of CASes pending reply
client.handleException(forcedTimeoutException, entry.getCasReferenceId(), entry.getCasReferenceId(), cachedRequest, true);
} catch( Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"forceTimeout", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
}
}
public synchronized void retryConnectionUntilSuccessfull() {
// Check if the connection has been restored to the broker. Another thread
// may have previously recovered the connection here while we were blocked
// on entry to this method ( it is synchronized)
if ( isConnectionValid() ) {
return;
}
// Change state of each client in this JVM that uses this shared connection.
for(BaseUIMAAsynchronousEngineCommon_impl client: clientList) {
client.state = ClientState.RECONNECTING;
client.producerInitialized = false;
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "retryConnectionUntilSuccessfull",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_lost_connection_to_broker__WARNING",
new Object[] { brokerURL, (stop==true) });
}
// This loop attempts to recover broker connection every 5 seconds and ends when all clients
// using this shared object terminate or a connection is recovered
while( !stop ) {
if ( clientList.size() == 0 ) {
break; // no more active clients - break out of connection recovery
}
try {
// Attempt a new connection to a broker
create();
// Got it, start the connection
start();
// Forces clients to drop old Session, Temp Queue, and Consumer objects and create
// new ones. This is needs to be done after a new Connection is created.
reinitializeClientListeners();
synchronized( stateMonitor) {
state = ConnectionState.OPEN;
}
break;
} catch( Exception e) {
synchronized( stateMonitor ) {
try {
stateMonitor.wait(5000); // retry every 5 secs
} catch( InterruptedException ie) {}
}
}
}
if ( !stop ) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "retryConnectionUntilSuccessfull",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_recovered_connection__INFO",
new Object[] { brokerURL });
}
}
for(BaseUIMAAsynchronousEngineCommon_impl client: clientList) {
client.state = ClientState.RUNNING;
}
}
public void start() throws Exception {
if ( connectionValidator != null && connectionValidator.connectionClosedOrInvalid(connection) ) {
throw new ResourceInitializationException(new Exception("Unable to start JMS connection that is not open."));
}
connection.start();
}
public ConnectionState getState() {
synchronized( stateMonitor) {
return this.state;
}
}
public synchronized Connection getConnection() {
return connection;
}
public synchronized void setConnection(Connection connection) {
this.connection = connection;
}
public void registerClient(BaseUIMAAsynchronousEngineCommon_impl client) {
synchronized(mux) {
clientList.add(client);
}
}
public void unregisterClient(BaseUIMAAsynchronousEngineCommon_impl client) {
synchronized(mux) {
clientList.remove(client);
}
}
public int getClientCount() {
synchronized (mux) {
return clientList.size();
}
}
/**
* This method is called from stop(). It will stop the shared connection if all of the clients
* have already terminated
* @return
*/
public boolean destroy() {
return destroy(false);
}
public boolean destroy(boolean doShutdown) {
synchronized(destroyMux) {
// Check if all clients have terminated and only than stop the shared connection
if (getClientCount() == 0 && connection != null
&& !((ActiveMQConnection) connection).isClosed()
&& !((ActiveMQConnection) connection).isClosing()) {
try {
stop = true;
connection.stop();
connection.close();
while( !((ActiveMQConnection) connection).isClosed() ) {
try {
destroyMux.wait(100);
} catch( InterruptedException exx) {}
}
} catch (Exception e) {
/* ignore */
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "destroy",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed__INFO",
new Object[] { });
}
return true;
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "destroy",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_shared_connection_not_closed__INFO",
new Object[] { getClientCount() });
}
}
return false;
}
}
}
public static class UimaASShutdownHook implements Runnable {
UimaAsynchronousEngine asEngine=null;
public UimaASShutdownHook( UimaAsynchronousEngine asEngine) {
this.asEngine = asEngine;
}
public void run() {
try {
if ( asEngine != null ) {
asEngine.stop();
}
} catch( Exception ex) {
ex.printStackTrace();
}
}
}
}