blob: e688b73c00739b331f81adb8d57a774285c48138 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.adapter.jms.activemq;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.NotSerializableException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.ConnectException;
import java.net.InetAddress;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import javax.management.ServiceNotFoundException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.Channel;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.OutputChannel;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaSerializer;
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.MessageTimeoutException;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.UimaEEServiceException;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.util.Level;
public class JmsOutputChannel implements OutputChannel {
private static final Class CLASS_NAME = JmsOutputChannel.class;
private static final long INACTIVITY_TIMEOUT = 1800000; // 30 minutes in term of millis
private CountDownLatch controllerLatch = new CountDownLatch(1);
private ActiveMQConnectionFactory connectionFactory;
// Name of the external queue this service uses to receive messages
private String serviceInputEndpoint;
// Name of the internal queue this services uses to receive messages from delegates
private String controllerInputEndpoint;
// Name of the queue used by Cas Multiplier to receive requests to free CASes
private String secondaryInputEndpoint;
// The service controller
private AnalysisEngineController analysisEngineController;
// Cache containing connections to destinations this service interacts with
// Each entry in this cache has an inactivity timer that times amount of time
// elapsed since the last time a message was sent to the destination.
private ConcurrentHashMap connectionMap = new ConcurrentHashMap();
private String serverURI;
private String serviceProtocolList = "";
private volatile boolean aborting = false;
private Destination freeCASTempQueue;
private String hostIP = null;
private UimaSerializer uimaSerializer = new UimaSerializer();
// By default every message will have expiration time added
private volatile boolean addTimeToLive = true;
public JmsOutputChannel() {
try {
hostIP = InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) { /* silently deal with this error */
}
// Check the environment for existence of NoTTL tag. If present,
// the deployer of the service wants to avoid message expiration.
if (System.getProperty("NoTTL") != null) {
addTimeToLive = false;
}
}
/**
* Sets the ActiveMQ Broker URI
*/
public void setServerURI(String aServerURI) {
serverURI = aServerURI;
}
protected void setFreeCasQueue(Destination destination) {
freeCASTempQueue = destination;
}
public String getServerURI() {
return serverURI;
}
public String getName() {
return "";
}
/**
*
* @param connectionFactory
*/
public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public void setServiceInputEndpoint(String anEnpoint) {
serviceInputEndpoint = anEnpoint;
}
public void setSecondaryInputQueue(String anEndpoint) {
secondaryInputEndpoint = anEndpoint;
}
public ActiveMQConnectionFactory getConnectionFactory() {
return this.connectionFactory;
}
public void initialize() throws AsynchAEException {
if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "initialize",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connector_list__FINE",
new Object[] { System.getProperty("ActiveMQConnectors") });
}
// Aggregate controller set this System property at startup in
serviceProtocolList = System.getProperty("ActiveMQConnectors");
}
}
/**
* Serializes CAS using indicated Serializer.
*
* @param aCAS
* - CAS instance to serialize
* @param aSerializerKey
* - a key identifying which serializer to use
* @return - String - serialized CAS as String
* @throws Exception
*/
public String serializeCAS(boolean isReply, CAS aCAS, String aCasReferenceId,
String aSerializerKey) throws Exception {
long start = getAnalysisEngineController().getCpuTime();
String serializedCas = null;
if (isReply || "xmi".equalsIgnoreCase(aSerializerKey)) {
CacheEntry cacheEntry = getAnalysisEngineController().getInProcessCache()
.getCacheEntryForCAS(aCasReferenceId);
XmiSerializationSharedData serSharedData;
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "serializeCAS",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_serialize_cas__FINE",
new Object[] { aCasReferenceId });
}
if (isReply) {
serSharedData = cacheEntry.getDeserSharedData();
if (cacheEntry.acceptsDeltaCas()
&& (cacheEntry.getMarker() != null && cacheEntry.getMarker().isValid())) {
serializedCas = uimaSerializer.serializeCasToXmi(aCAS, serSharedData, cacheEntry
.getMarker());
cacheEntry.setSentDeltaCas(true);
} else {
serializedCas = uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
cacheEntry.setSentDeltaCas(false);
}
// if market is invalid, create a fresh marker.
if (cacheEntry.getMarker() != null && !cacheEntry.getMarker().isValid()) {
cacheEntry.setMarker(aCAS.createMarker());
}
} else {
serSharedData = cacheEntry.getDeserSharedData();
if (serSharedData == null) {
serSharedData = new XmiSerializationSharedData();
cacheEntry.setXmiSerializationData(serSharedData);
}
serializedCas = uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
int maxOutgoingXmiId = serSharedData.getMaxXmiId();
// Save High Water Mark in case a merge is needed
getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId)
.setHighWaterMark(maxOutgoingXmiId);
}
} else if ("xcas".equalsIgnoreCase(aSerializerKey)) {
// Default is XCAS
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
uimaSerializer.serializeToXCAS(bos, aCAS, null, null, null);
serializedCas = bos.toString();
} catch (Exception e) {
throw e;
} finally {
bos.close();
}
}
LongNumericStatistic statistic;
if ((statistic = getAnalysisEngineController().getMonitor().getLongNumericStatistic("",
Monitor.TotalSerializeTime)) != null) {
statistic.increment(getAnalysisEngineController().getCpuTime() - start);
}
return serializedCas;
}
/**
* This method verifies that the destination (queue) exists. It opens a connection the a broker,
* creates a session and a message producer. Finally, using the message producer, sends an empty
* message to a queue. This API support enables checking for existence of the reply (temp) queue
* before any processing of a cas is done. This is an optimization to prevent expensive processing
* if the client destination is no longer available.
*/
public void bindWithClientEndpoint(Endpoint anEndpoint) throws Exception {
// check if the reply endpoint is a temp destination
if (anEndpoint.getDestination() != null) {
// create message producer if one doesnt exist for this destination
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
// Create empty message
TextMessage tm = endpointConnection.produceTextMessage("");
// test sending a message to reply endpoint. This tests existence of
// a temp queue. If the client has been shutdown, this will fail
// with an exception.
endpointConnection.send(tm, 0, false);
}
}
private long getInactivityTimeout(String destination, String brokerURL) {
if (System.getProperty(JmsConstants.SessionTimeoutOverride) != null) {
try {
long overrideTimeoutValue = Long.parseLong(System
.getProperty(JmsConstants.SessionTimeoutOverride));
// endpointConnection.setInactivityTimeout(overrideTimeoutValue); // If the connection is
// not used within this interval it will be removed
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_override_connection_timeout__FINE",
new Object[] { analysisEngineController, overrideTimeoutValue, destination,
brokerURL });
}
return overrideTimeoutValue;
} catch (NumberFormatException e) {
/* ignore. use the default */}
} else {
// endpointConnection.setInactivityTimeout(INACTIVITY_TIMEOUT); // If the connection is not
// used within this interval it will be removed
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME)
.logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_connection_timeout__FINE",
new Object[] { analysisEngineController, INACTIVITY_TIMEOUT, destination,
brokerURL });
}
}
return (int) INACTIVITY_TIMEOUT; // default
}
/**
* Stop JMS connection and close all sessions associated with this connection
*
* @param brokerConnectionEntry
*/
private void invalidateConnectionAndEndpoints(BrokerConnectionEntry brokerConnectionEntry ) {
Connection conn = brokerConnectionEntry.getConnection();
try {
if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
brokerConnectionEntry.getConnection().stop();
brokerConnectionEntry.getConnection().close();
brokerConnectionEntry.setConnection(null);
for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
.entrySet()) {
endpoints.getValue().close(); // close session and producer
}
}
} catch (Exception e) {
// Ignore this for now. Attempting to close connection that has been closed
// Ignore we are shutting down
} finally {
brokerConnectionEntry.endpointMap.clear();
connectionMap.remove(brokerConnectionEntry.getBrokerURL());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"invalidateConnectionAndEndpoints",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_service_closing_connection__INFO",
new Object[] { getAnalysisEngineController().getComponentName(),
brokerConnectionEntry.getBrokerURL() });
}
}
brokerConnectionEntry.setConnection(null);
}
private String getDestinationName(Endpoint anEndpoint) {
String destination = anEndpoint.getEndpoint();
if (anEndpoint.getDestination() != null
&& anEndpoint.getDestination() instanceof ActiveMQDestination) {
destination = ((ActiveMQDestination) anEndpoint.getDestination()).getPhysicalName();
}
return destination;
}
private String getLookupKey(Endpoint anEndpoint) {
String key = anEndpoint.getEndpoint() + anEndpoint.getServerURI();
String destination = getDestinationName(anEndpoint);
if ( anEndpoint.getDelegateKey() != null ) {
key = anEndpoint.getDelegateKey() + "-"+destination;
} else {
key = "Client-"+destination;
}
return key;
}
private BrokerConnectionEntry createConnectionEntry(String brokerURL) {
BrokerConnectionEntry brokerConnectionEntry = new BrokerConnectionEntry();
connectionMap.put(brokerURL, brokerConnectionEntry);
ConnectionTimer connectionTimer = new ConnectionTimer(brokerConnectionEntry);
connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
brokerConnectionEntry.setConnectionTimer(connectionTimer);
return brokerConnectionEntry;
}
/**
* Returns {@link JmsEndpointConnection_impl} instance bound to a destination defined in the
* {@link Endpoint} The endpoint identifies the destination that should receive the message. This
* method refrences a cache that stores active connections. Active connections are those that are
* fully bound and being used for communication. The key to locate the entry in the connection
* cache is the queue name + broker URI. This uniquely identifies the destination. If an entry
* does not exist in the cache, this routine will create a new connection, initialize it, and
* cache it for future use. The cache is purely for optimization, to prevent openinig a connection
* for every message which is a costly operation. Instead the connection is open, cached and
* reused. The {@link JmsEndpointConnection_impl} instance is stored in the cache, and uses a
* timer to make sure stale connection are removed. If a connection is not used in a given time
* interval, the connection is considered stale and is dropped from the cache.
*
* @param anEndpoint
* - endpoint configuration containing connection information to a destination
* @return -
* @throws AsynchAEException
*/
private synchronized JmsEndpointConnection_impl getEndpointConnection(Endpoint anEndpoint)
throws AsynchAEException, ServiceShutdownException, ConnectException {
try {
controllerLatch.await();
} catch (InterruptedException e) {
}
JmsEndpointConnection_impl endpointConnection = null;
// First get a Map containing destinations managed by a broker provided by the client
BrokerConnectionEntry brokerConnectionEntry = null;
if (connectionMap.containsKey(anEndpoint.getServerURI())) {
brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(anEndpoint.getServerURI());
// Findbugs thinks that the above may return null, perhaps due to a race condition. Add
// the null check just in case
if (brokerConnectionEntry == null) {
throw new AsynchAEException("Controller:"
+ getAnalysisEngineController().getComponentName()
+ " Unable to Lookup Broker Connection For URL:" + anEndpoint.getServerURI());
}
brokerConnectionEntry.setBrokerURL(anEndpoint.getServerURI());
if ( JmsEndpointConnection_impl.connectionClosedOrFailed(brokerConnectionEntry) ) {
invalidateConnectionAndEndpoints(brokerConnectionEntry);
brokerConnectionEntry = createConnectionEntry(anEndpoint.getServerURI());
}
} else {
brokerConnectionEntry = createConnectionEntry(anEndpoint.getServerURI());
}
String key = getLookupKey(anEndpoint);
String destination = getDestinationName(anEndpoint);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_acquiring_connection_to_endpoint__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), destination,
anEndpoint.getServerURI() });
}
// check the cache first
if (!brokerConnectionEntry.endpointExists(key)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_create_new_connection__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), destination,
anEndpoint.getServerURI() });
}
endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
getAnalysisEngineController());
brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
long replyQueueInactivityTimeout = getInactivityTimeout(destination, anEndpoint
.getServerURI());
brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
// Connection is not in the cache, create a new connection, initialize it and cache it
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_open_new_connection_to_endpoint__FINE",
new Object[] { getDestinationName(anEndpoint), anEndpoint.getServerURI() });
}
/**
* Open connection to a broker, create JMS session and MessageProducer
*/
endpointConnection.open();
brokerConnectionEntry.getConnectionTimer().setConnectionCreationTimestamp(
endpointConnection.connectionCreationTimestamp);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_connection_opened_to_endpoint__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), destination,
anEndpoint.getServerURI() });
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_reusing_existing_connection__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), destination,
anEndpoint.getServerURI() });
}
// Retrieve connection from the connection cache
endpointConnection = brokerConnectionEntry.getEndpointConnection(key);
// check the state of the connection and re-open it if necessary
if (endpointConnection != null && !endpointConnection.isOpen()) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_connection_closed_reopening_endpoint__FINE",
new Object[] { destination });
}
endpointConnection.open();
brokerConnectionEntry.getConnectionTimer()
.setConnectionCreationTimestamp(System.nanoTime());
}
}
return endpointConnection;
}
/**
* Sends request message to a delegate.
*
* @param aCommand
* - the type of request [Process|GetMeta]
* @param anEndpoint
* - the destination where the delegate receives messages
*
* @throws AsynchAEException
*/
public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint)
throws AsynchAEException {
try {
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
TextMessage tm = endpointConnection.produceTextMessage("");
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
if (aCommand == AsynchAEMessage.ReleaseCAS || aCommand == AsynchAEMessage.Stop) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_release_cas_req__FINE",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint(),
aCasReferenceId });
}
}
// Only used to send a Stop or ReleaseCas request so probably no need to start a connection
// timer ?
endpointConnection.send(tm, 0, true);
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
} catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_shutdown__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
/**
* Sends request message to a delegate.
*
* @param aCommand
* - the type of request [Process|GetMeta]
* @param anEndpoint
* - the destination where the delegate receives messages
*
* @throws AsynchAEException
*/
public void sendRequest(int aCommand, Endpoint anEndpoint) {
Delegate delegate = null;
try {
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
TextMessage tm = endpointConnection.produceTextMessage("");
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
tm.setText(""); // Need this to prevent the Broker from throwing an exception when sending a
// message to C++ service
populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
// For remotes add a special property to the message. This property
// will be echoed back by the service. This property enables matching
// the reply with the right endpoint object managed by the aggregate.
if (anEndpoint.isRemote()) {
tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());
}
boolean startTimer = false;
// Start timer for endpoints that are remote and are managed by a different broker
// than this service. If an endpoint contains a destination object, the outgoing
// request will contain a JMSReplyTo object which will point to a temp queue
if (anEndpoint.isRemote() && anEndpoint.getDestination() == null) {
startTimer = true;
}
if (aCommand == AsynchAEMessage.CollectionProcessComplete) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"sendRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_send_cpc_req__FINE", new Object[] { anEndpoint.getEndpoint() });
}
} else if (aCommand == AsynchAEMessage.ReleaseCAS) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_releasecas_request__endpoint__FINEST",
new Object[] { getAnalysisEngineController().getName(),
endpointConnection.getEndpoint() });
}
} else if (aCommand == AsynchAEMessage.GetMeta) {
if (anEndpoint.getDestination() != null) {
String replyQueueName = ((ActiveMQDestination) anEndpoint.getDestination())
.getPhysicalName().replaceAll(":", "_");
if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
String delegateKey = ((AggregateAnalysisEngineController) getAnalysisEngineController())
.lookUpDelegateKey(anEndpoint.getEndpoint());
ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
.getDelegateServiceInfo(delegateKey);
if (serviceInfo != null) {
serviceInfo.setReplyQueueName(replyQueueName);
serviceInfo.setServiceKey(delegateKey);
}
delegate = lookupDelegate(delegateKey);
if (delegate.getGetMetaTimeout() > 0) {
delegate.startGetMetaRequestTimer();
}
}
} else if (!anEndpoint.isRemote()) {
ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
.getServiceInfo();
if (serviceInfo != null) {
serviceInfo.setReplyQueueName(controllerInputEndpoint);
}
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_metadata_request__endpoint__FINEST",
new Object[] { endpointConnection.getEndpoint(),
endpointConnection.getServerUri() });
}
}
if (endpointConnection.send(tm, 0, startTimer) != true) {
throw new ServiceNotFoundException();
}
} catch (Exception e) {
if (delegate != null && aCommand == AsynchAEMessage.GetMeta) {
delegate.cancelDelegateTimer();
}
// Handle the error
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, aCommand);
errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);
getAnalysisEngineController().getErrorHandlerChain().handle(e, errorContext,
getAnalysisEngineController());
}
}
public void sendRequest(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
new Object[] { anEndpoint.getEndpoint() });
}
try {
if (anEndpoint.isRemote()) {
if (anEndpoint.getSerializer().equals("xmi")) {
String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId, anEndpoint,
anEndpoint.isRetryEnabled());
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_sending_serialized_cas__FINEST",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getEndpoint(), aCasReferenceId, serializedCAS });
}
// Send process request to remote delegate and start timeout timer
sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
} else {
byte[] serializedCAS = getBinaryCasAndReleaseIt(false, aCasReferenceId, anEndpoint,
anEndpoint.isRetryEnabled());
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_sending_binary_cas__FINEST",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getEndpoint(), aCasReferenceId, serializedCAS });
}
// Send process request to remote delegate and start timeout timer
sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
}
} else {
// Not supported
}
} catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
/**
* Sends request message to process CAS to the given destinations. This method enables processing
* of the same CAS in multiple Analysis Engines at the same time.
*
* @param aCasReferenceId
* @param anEndpoint
* @throws AsynchAEException
*/
public void sendRequest(String aCasReferenceId, Endpoint[] endpoints) throws AsynchAEException {
Endpoint currentEndpoint = null;
try {
boolean cacheSerializedCas = endpointRetryEnabled(endpoints);
// The default serialization strategy for parallel step is xmi.
// Binary serialization doesnt support merge.
endpoints[0].setSerializer("xmi");
// Serialize CAS using serializer defined in the first endpoint. All endpoints in the parallel
// Flow
// must use the same format (either XCAS or XMI)
String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId, endpoints[0],
cacheSerializedCas);
// Using provided endpoints, create JMS message for each destination and sent the serialized
// CAS to it.
for (int i = 0; i < endpoints.length; i++) {
// For remote delegates, optionally cache serialized CAS in case a retry on timeout is
// required.
if (endpoints[i].isRemote()) {
currentEndpoint = endpoints[i];
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"sendRequest",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_sending_serialized_cas__FINEST",
new Object[] { getAnalysisEngineController().getComponentName(),
endpoints[i].getEndpoint(), aCasReferenceId, serializedCAS });
}
// The default serialization strategy for parallel step is xmi.
// Binary serialization doesnt support merge.
endpoints[i].setSerializer("xmi");
sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, endpoints[i], true, 0);
} else {
// Currently this use case is not supported. Parallel processing of CAS is only supported
// with remote Delegates
}
}
} catch (Exception e) {
// Handle the error
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.Endpoint, currentEndpoint);
errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
getAnalysisEngineController().getErrorHandlerChain().handle(e, errorContext,
getAnalysisEngineController());
}
}
public void sendReply(CAS aCas, String anInputCasReferenceId, String aNewCasReferenceId,
Endpoint anEndpoint, long sequence) throws AsynchAEException {
try {
anEndpoint.setReplyEndpoint(true);
if (anEndpoint.isRemote()) {
// Serializes CAS and releases it back to CAS Pool
String serializedCAS = getSerializedCas(true, aNewCasReferenceId, anEndpoint, anEndpoint
.isRetryEnabled());
sendCasToRemoteEndpoint(false, serializedCAS, anInputCasReferenceId, aNewCasReferenceId,
anEndpoint, false, sequence);
} else {
// Not supported
}
} catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
} catch (AsynchAEException e) {
throw e;
}
catch (Exception e) {
throw new AsynchAEException(e);
}
}
public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
try {
anEndpoint.setReplyEndpoint(true);
if (anEndpoint.isRemote()) {
if (anEndpoint.getSerializer().equals("xmi")) {
// Serializes CAS and releases it back to CAS Pool
String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint,
anEndpoint.isRetryEnabled());
sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
} else {
byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint
.isRetryEnabled());
if (binaryCas == null) {
return;
}
sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
}
} else {
// Not supported
}
} catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
} catch (AsynchAEException e) {
throw e;
}
catch (Exception e) {
throw new AsynchAEException(e);
}
}
public void sendReply(int aCommand, Endpoint anEndpoint, String aCasReferenceId)
throws AsynchAEException {
try {
if (aborting) {
return;
}
anEndpoint.setReplyEndpoint(true);
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
TextMessage tm = endpointConnection.produceTextMessage("");
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
populateHeaderWithResponseContext(tm, anEndpoint, aCommand);
tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
// If this service is a Cas Multiplier add to the message a FreeCasQueue.
// The client may need send Stop request to that queue.
if (aCommand == AsynchAEMessage.ServiceInfo
&& getAnalysisEngineController().isCasMultiplier() && freeCASTempQueue != null) {
// Attach a temp queue to the outgoing message. This a queue where
// Free CAS notifications need to be sent from the client
tm.setJMSReplyTo(freeCASTempQueue);
}
endpointConnection.send(tm, 0, false);
addIdleTime(tm);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_cpc_reply_sent__FINE",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getEndpoint() });
}
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
new Object[] { e });
}
}
catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
public void sendReply(int aCommand, Endpoint anEndpoint) throws AsynchAEException {
anEndpoint.setReplyEndpoint(true);
try {
if (aborting) {
return;
}
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
TextMessage tm = endpointConnection.produceTextMessage("");
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
populateHeaderWithResponseContext(tm, anEndpoint, aCommand);
endpointConnection.send(tm, 0, false);
addIdleTime(tm);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_cpc_reply_sent__FINE",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getEndpoint() });
}
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
new Object[] { e });
}
}
catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
/**
* Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
* full stack)
*
* @param t
* - Throwable to include in the reply message
* @param anEndpoint
* - an endpoint to receive the reply message
* @param aCasReferenceId
* - a unique CAS reference id
*
* @throws AsynchAEException
*/
public void sendReply(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
anEndpoint.setReplyEndpoint(true);
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_replyto_endpoint__FINE",
new Object[] { anEndpoint.getEndpoint(), aCasReferenceId });
}
if (anEndpoint.isRemote()) {
CacheEntry entry = null;
try {
entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
aCasReferenceId);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_cas_not_found__INFO",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getEndpoint(), aCasReferenceId });
}
return;
}
if (anEndpoint.getSerializer().equals("xmi")) {
// Serializes CAS and releases it back to CAS Pool
String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
} else {
byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint
.isRetryEnabled());
if (binaryCas == null) {
return;
}
sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
}
} else {
// Not supported
}
} catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
/**
* Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
* full stack)
*
* @param t
* - Throwable to include in the reply message
* @param anEndpoint
* - an endpoint to receive the reply message
* @param aCasReferenceId
* - a unique CAS reference id
*
* @throws AsynchAEException
*/
public void sendReply(Throwable t, String aCasReferenceId, String aParentCasReferenceId,
Endpoint anEndpoint, int aCommand) throws AsynchAEException {
anEndpoint.setReplyEndpoint(true);
try {
Throwable wrapper = null;
if (!(t instanceof UimaEEServiceException)) {
// Strip off AsyncAEException and replace with UimaEEServiceException
if (t instanceof AsynchAEException && t.getCause() != null) {
wrapper = new UimaEEServiceException(t.getCause());
} else {
wrapper = new UimaEEServiceException(t);
}
}
if (aborting) {
return;
}
anEndpoint.setReplyEndpoint(true);
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
// Create Message that will contain serialized Exception with stack
ObjectMessage om = endpointConnection.produceObjectMessage();
// Now try to catch non-serializable exception. The Throwable passed into this method may
// not be serializable. Catch the exception, and create a wrapper containing stringified
// stack trace.
try {
// serialize the Throwable
if (wrapper == null) {
om.setObject(t);
} else {
om.setObject(wrapper);
}
} catch( RuntimeException e) {
// Check if we failed due to non-serializable object in the Throwable
if ( e.getCause() != null && e.getCause() instanceof NotSerializableException ) {
// stringify the stack trace
StringWriter sw = new StringWriter();
t.printStackTrace(new PrintWriter(sw));
wrapper = new UimaEEServiceException(sw.toString());
// serialize the new wrapper
om.setObject(wrapper);
} else {
throw e; // rethrow
}
} catch( Exception e) {
throw e; // rethrow
}
// Add common header properties
populateHeaderWithResponseContext(om, anEndpoint, aCommand); // AsynchAEMessage.Process);
om.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
if (aCasReferenceId != null) {
om.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
if (aParentCasReferenceId != null) {
om.setStringProperty(AsynchAEMessage.InputCasReference, aParentCasReferenceId);
}
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_exception__FINE",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
// Dispatch Message to destination
endpointConnection.send(om, 0, false);
addIdleTime(om);
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
} catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
} catch (AsynchAEException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
/**
*
* @param aMetadata
* @param anEndpoint
* @throws AsynchAEException
*/
public void sendReply(ProcessingResourceMetaData aProcessingResourceMetadata,
Endpoint anEndpoint, boolean serialize) throws AsynchAEException {
if (aborting) {
return;
}
long msgSize = 0;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
anEndpoint.setReplyEndpoint(true);
// Initialize JMS connection to given endpoint
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_produce_txt_msg__FINE",
new Object[] {});
}
TextMessage tm = endpointConnection.produceTextMessage("");
// Collocated Aggregate components dont send metadata just empty reply
// Such aggregate has merged its typesystem already since it shares
// CasManager with its parent
if (serialize) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_serializing_meta__FINE", new Object[] {});
}
// Serialize metadata
aProcessingResourceMetadata.toXML(bos);
tm.setText(bos.toString());
msgSize = bos.toString().length();
}
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Metadata);
// This service supports Binary Serialization
tm.setIntProperty(AsynchAEMessage.Serialization, AsynchAEMessage.BinarySerialization);
populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.GetMeta);
if (freeCASTempQueue != null) {
// Attach a temp queue to the outgoing message. This a queue where
// Free CAS notifications need to be sent from the client
tm.setJMSReplyTo(freeCASTempQueue);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_metadata_reply__endpoint__FINEST",
new Object[] { serviceInputEndpoint, anEndpoint.getEndpoint() });
}
endpointConnection.send(tm, msgSize, false);
} catch (JMSException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
// Unable to establish connection to the endpoint. Log it and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
}
catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
throw new AsynchAEException(e);
} finally {
try {
bos.close();
} catch (Exception e) {
}
}
}
private byte[] getBinaryCas(boolean isReply, String aCasReferenceId, Endpoint anEndpoint,
boolean cacheSerializedCas) throws Exception {
CAS cas = null;
try {
byte[] serializedCAS = null;
// Using Cas reference Id retrieve CAS from the shared Cash
cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
aCasReferenceId);
long t1 = getAnalysisEngineController().getCpuTime();
// Serialize CAS for remote Delegates
String serializer = anEndpoint.getSerializer();
if (cas == null || entry == null) {
return null;
}
if (serializer.equals("binary")) {
if (entry.acceptsDeltaCas() && isReply) {
if (entry.getMarker() != null && entry.getMarker().isValid()) {
serializedCAS = uimaSerializer.serializeCasToBinary(cas, entry.getMarker());
entry.setSentDeltaCas(true);
} else {
serializedCAS = uimaSerializer.serializeCasToBinary(cas);
entry.setSentDeltaCas(false);
}
} else {
serializedCAS = uimaSerializer.serializeCasToBinary(cas);
entry.setSentDeltaCas(false);
}
// create a fresh marker
if (entry.getMarker() != null && !entry.getMarker().isValid()) {
entry.setMarker(cas.createMarker());
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"getBinaryCas",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_invalid_serializer__WARNING",
new Object[] { getAnalysisEngineController().getName(), serializer,
anEndpoint.getEndpoint() });
}
throw new UimaEEServiceException("Invalid Serializer:" + serializer + " For Endpoint:"
+ anEndpoint.getEndpoint());
}
long timeToSerializeCas = getAnalysisEngineController().getCpuTime() - t1;
getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
entry.incrementTimeToSerializeCAS(timeToSerializeCas);
casStats.incrementCasSerializationTime(timeToSerializeCas);
getAnalysisEngineController().getServicePerformance().incrementCasSerializationTime(
timeToSerializeCas);
return serializedCAS;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private String getSerializedCas(boolean isReply, String aCasReferenceId, Endpoint anEndpoint,
boolean cacheSerializedCas) throws Exception {
CAS cas = null;
try {
String serializedCAS = null;
// Using Cas reference Id retrieve CAS from the shared Cash
cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
if (cas == null) {
serializedCAS = getAnalysisEngineController().getInProcessCache().getSerializedCAS(
aCasReferenceId);
} else {
CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
aCasReferenceId);
long t1 = getAnalysisEngineController().getCpuTime();
// Serialize CAS for remote Delegates
String serializer = anEndpoint.getSerializer();
if (serializer == null || serializer.trim().length() == 0) {
serializer = "xmi";
}
serializedCAS = serializeCAS(isReply, cas, aCasReferenceId, serializer);
long timeToSerializeCas = getAnalysisEngineController().getCpuTime() - t1;
getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
entry.incrementTimeToSerializeCAS(timeToSerializeCas);
casStats.incrementCasSerializationTime(timeToSerializeCas);
getAnalysisEngineController().getServicePerformance().incrementCasSerializationTime(
timeToSerializeCas);
if (cacheSerializedCas) {
getAnalysisEngineController().getInProcessCache().saveSerializedCAS(aCasReferenceId,
serializedCAS);
}
}
return serializedCAS;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private byte[] getBinaryCasAndReleaseIt(boolean isReply, String aCasReferenceId,
Endpoint anEndpoint, boolean cacheSerializedCas) throws Exception {
try {
return getBinaryCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
} catch (Exception e) {
throw new AsynchAEException(e);
} finally {
if (getAnalysisEngineController() instanceof PrimitiveAnalysisEngineController
&& anEndpoint.isRemote()) {
getAnalysisEngineController().dropCAS(aCasReferenceId, true);
}
}
}
private String getSerializedCasAndReleaseIt(boolean isReply, String aCasReferenceId,
Endpoint anEndpoint, boolean cacheSerializedCas) throws Exception {
try {
return getSerializedCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
} catch (Exception e) {
throw new AsynchAEException(e);
} finally {
if (getAnalysisEngineController() instanceof PrimitiveAnalysisEngineController
&& anEndpoint.isRemote()) {
getAnalysisEngineController().dropCAS(aCasReferenceId, true);
}
}
}
private boolean endpointRetryEnabled(Endpoint[] endpoints) {
for (int i = 0; i < endpoints.length; i++) {
if (endpoints[i].isRetryEnabled()) {
return true;
}
}
return false;
}
private void populateStats(Message aTextMessage, Endpoint anEndpoint, String aCasReferenceId,
int anAdminCommand, boolean isRequest) throws Exception {
if (anEndpoint.isFinal()) {
aTextMessage.setLongProperty("SENT-TIME", System.nanoTime());
}
if (anAdminCommand == AsynchAEMessage.Process) {
if (isRequest) {
long departureTime = System.nanoTime();
getAnalysisEngineController().saveTime(departureTime, aCasReferenceId,
anEndpoint.getEndpoint());
} else {
ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(
aCasReferenceId);
aTextMessage.setLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
.getRawCasSerializationTime());
aTextMessage.setLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
.getRawCasDeserializationTime());
aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats
.getRawAnalysisTime());
aTextMessage.setLongProperty(AsynchAEMessage.TimeWaitingForCAS,
getAnalysisEngineController().getServicePerformance().getTimeWaitingForCAS());
long iT = getAnalysisEngineController().getIdleTimeBetweenProcessCalls(
AsynchAEMessage.Process);
aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT);
String lookupKey = getAnalysisEngineController().getName();
long arrivalTime = getAnalysisEngineController().getTime(aCasReferenceId, lookupKey); // serviceInputEndpoint);
long timeInService = getAnalysisEngineController().getCpuTime() - arrivalTime;
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"populateStats", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_timein_service__FINEST",
new Object[] { serviceInputEndpoint, (double) timeInService / (double) 1000000 });
}
}
}
}
private long getCommandTimeoutValue(Endpoint anEndpoint, int aCommand) {
switch (aCommand) {
case AsynchAEMessage.GetMeta:
return anEndpoint.getMetadataRequestTimeout();
case AsynchAEMessage.Process:
return anEndpoint.getProcessRequestTimeout();
}
return 0; // no match for the command
}
/**
* Adds Request specific properties to the JMS Header.
*
* @param aMessage
* @param anEndpoint
* @param aCommand
* @throws Exception
*/
private void populateHeaderWithRequestContext(Message aMessage, Endpoint anEndpoint, int aCommand)
throws Exception {
aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
aMessage.setIntProperty(AsynchAEMessage.Command, aCommand);
// TODO override default based on system property
aMessage.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);
long timeout = getCommandTimeoutValue(anEndpoint, aCommand);
// If the timeout is defined in the Deployment Descriptor and
// the service is configured to use time to live (TTL), add
// JMS message expiration time. The TTL is by default always
// added to the message. To override this add "-DNoTTL" to the
// command line.
if (timeout > 0 && addTimeToLive) {
Delegate delegate = lookupDelegate(anEndpoint.getDelegateKey());
long ttl = timeout;
// How many CASes are in the list of CASes pending reply for this delegate
int currentOutstandingCasListSize = delegate.getCasPendingReplyListSize();
if (currentOutstandingCasListSize > 0) {
// increase the time-to-live
ttl *= currentOutstandingCasListSize;
}
aMessage.setJMSExpiration(ttl);
}
if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
aMessage.setStringProperty(AsynchAEMessage.MessageFrom, controllerInputEndpoint);
if (anEndpoint.isRemote()) {
String protocol = serviceProtocolList;
if (anEndpoint.getServerURI().trim().toLowerCase().startsWith("http")
|| (anEndpoint.getReplyToEndpoint() != null && anEndpoint.getReplyToEndpoint()
.trim().length() > 0)) {
protocol = anEndpoint.getServerURI().trim();
// protocol = extractURLWithProtocol(serviceProtocolList, "http");
// get the replyto endpoint name
String replyTo = anEndpoint.getReplyToEndpoint();
if (replyTo == null && anEndpoint.getDestination() == null) {
throw new AsynchAEException(
"replyTo endpoint name not specified for HTTP-based endpoint:"
+ anEndpoint.getEndpoint());
}
if (replyTo == null) {
replyTo = "";
}
aMessage.setStringProperty(AsynchAEMessage.MessageFrom, replyTo);
}
Object destination;
if ((destination = anEndpoint.getDestination()) != null) {
aMessage.setJMSReplyTo((Destination) destination);
aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
} else {
aMessage.setStringProperty(UIMAMessage.ServerURI, protocol);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"populateHeaderWithRequestContext",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_sending_new_msg_to_remote_FINE",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getServerURI(), anEndpoint.getEndpoint() });
}
} else // collocated
{
aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
}
}
}
/**
* Adds Response specific properties to the JMS Header
*
* @param aMessage
* @param anEndpoint
* @param aCommand
* @throws Exception
*/
private void populateHeaderWithResponseContext(Message aMessage, Endpoint anEndpoint, int aCommand)
throws Exception {
aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Response);
aMessage.setIntProperty(AsynchAEMessage.Command, aCommand);
aMessage.setStringProperty(AsynchAEMessage.MessageFrom, serviceInputEndpoint);
if (anEndpoint.isRemote()) {
aMessage.setStringProperty(UIMAMessage.ServerURI, getServerURI());
if (hostIP != null) {
aMessage.setStringProperty(AsynchAEMessage.ServerIP, hostIP);
}
if (anEndpoint.getEndpointServer() != null) {
aMessage.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getEndpointServer());
}
} else {
aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
}
}
public AnalysisEngineController getAnalysisEngineController() {
return analysisEngineController;
}
public void setController(AnalysisEngineController analysisEngineController) {
this.analysisEngineController = analysisEngineController;
controllerLatch.countDown();
}
public String getControllerInputEndpoint() {
return controllerInputEndpoint;
}
public void setControllerInputEndpoint(String controllerInputEndpoint) {
this.controllerInputEndpoint = controllerInputEndpoint;
}
private void dispatch(Message aMessage, Endpoint anEndpoint, CacheEntry entry, boolean isRequest,
JmsEndpointConnection_impl endpointConnection, long msgSize) throws Exception {
// Add stats
populateStats(aMessage, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process,
isRequest);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"dispatch",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_sending_new_msg_to_remote_FINE",
new Object[] { getAnalysisEngineController().getName(),
endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
}
// By default start a timer associated with a connection to the endpoint. Once a connection is
// established with an
// endpoint it is cached and reused for subsequent messaging. If the connection is not used
// within a given interval
// the timer silently expires and closes the connection. This mechanism is similar to what Web
// Server does when
// managing sessions. In case when we want the remote delegate to respond to a temporary queue,
// which is implied
// by anEndpoint.getDestination != null, we dont start the timer.
boolean startConnectionTimer = isRequest ? false : true; // connection time is for replies
// ----------------------------------------------------
// Send Request Messsage to the Endpoint
// ----------------------------------------------------
// Add the CAS to the delegate's list of CASes pending reply. Do the add before
// the send to eliminate a race condition where the reply is received (on different
// thread) *before* the CAS is added to the list.
if (isRequest) {
anEndpoint.setWaitingForResponse(true);
// Add CAS to the list of CASes pending reply
addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
} else {
addIdleTime(aMessage);
}
// If the send fails it returns false.
if (endpointConnection.send(aMessage, msgSize, startConnectionTimer) == false) {
// Failure on sending a request requires cleanup that includes stopping a listener
// on the delegate that we were unable to send a message to. The delegate state is
// set to FAILED. If there are retries or more CASes to send to this delegate the
// connection will be retried.
if (isRequest) {
// Spin recovery thread to handle send error. After the recovery thread
// is started the current (process) thread goes back to a thread pool in
// ThreadPoolExecutor. The recovery thread can than stop the listener and the
// ThreadPoolExecutor since all threads are back in the pool. Any retries will
// be done in the recovery thread.
RecoveryThread recoveryThread = new RecoveryThread(this, anEndpoint, entry, isRequest,
getAnalysisEngineController());
Thread t = new Thread(Thread.currentThread().getThreadGroup().getParent(), recoveryThread);
t.start();
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"dispatch",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_send_reply_failed__INFO",
new Object[] { getAnalysisEngineController().getComponentName(),
endpointConnection.getServerUri(), endpointConnection.getEndpoint() });
}
}
}
}
private void sendCasToRemoteEndpoint(boolean isRequest, String aSerializedCAS,
String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint,
boolean startTimer, long sequence) throws AsynchAEException, ServiceShutdownException {
long msgSize = 0;
try {
if (aborting) {
return;
}
CacheEntry entry = this.getCacheEntry(aCasReferenceId);
if (entry == null) {
throw new AsynchAEException("Controller:"
+ getAnalysisEngineController().getComponentName()
+ " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
+ " CAS:" + aCasReferenceId + " Not In The Cache");
}
// Get the connection object for a given endpoint
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
if (endpointConnection == null) {
throw new AsynchAEException("Controller:"
+ getAnalysisEngineController().getComponentName()
+ " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
+ " Connection is Invalid. InputCasReferenceId:" + anInputCasReferenceId
+ " CasReferenceId:" + aCasReferenceId + " Sequece:" + sequence);
}
if (!endpointConnection.isOpen()) {
if (!isRequest) {
return;
}
}
TextMessage tm = null;
try {
// Create empty JMS Text Message
tm = endpointConnection.produceTextMessage("");
} catch (AsynchAEException ex) {
System.out.println("UIMA AS Service:" + getAnalysisEngineController().getComponentName()
+ " Unable to Send Reply Message To Remote Endpoint: "
+ anEndpoint.getDestination() + ". Broker:" + anEndpoint.getServerURI()
+ " is Unavailable. InputCasReferenceId:" + anInputCasReferenceId
+ " CasReferenceId:" + aCasReferenceId + " Sequece:" + sequence);
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"sendCasToRemoteDelegate",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getEndpoint() });
return;
}
// Save Serialized CAS in case we need to re-send it for analysis
if (anEndpoint.isRetryEnabled()
&& getAnalysisEngineController().getInProcessCache()
.getSerializedCAS(aCasReferenceId) == null) {
getAnalysisEngineController().getInProcessCache().saveSerializedCAS(aCasReferenceId,
aSerializedCAS);
}
if (aSerializedCAS != null) {
msgSize = aSerializedCAS.length();
}
tm.setText(aSerializedCAS);
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
// Add Cas Reference Id to the outgoing JMS Header
tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
// Add common properties to the JMS Header
if (isRequest == true) {
populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
} else {
populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
}
// The following is true when the analytic is a CAS Multiplier
if (sequence > 0 && !isRequest) {
// Override MessageType set in the populateHeaderWithContext above.
// Make the reply message look like a request. This message will contain a new CAS
// produced by the CAS Multiplier. The client will treat this CAS
// differently from the input CAS.
tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
// Add a sequence number assigned to this CAS by the controller
tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
isRequest = true;
// Add the name of the FreeCas Queue
if (freeCASTempQueue != null) {
// Attach a temp queue to the outgoing message. This a queue where
// Free CAS notifications need to be sent from the client
tm.setJMSReplyTo(freeCASTempQueue);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
if (entry != null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendCasToRemoteEndpoint",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_send_cas_to_collocated_service_detail__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId,
entry.getInputCasReferenceId() });
}
}
}
dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Log it and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
}
catch (ServiceShutdownException e) {
throw e;
} catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private void sendCasToRemoteEndpoint(boolean isRequest, byte[] aSerializedCAS,
String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint,
boolean startTimer, long sequence) throws AsynchAEException, ServiceShutdownException {
long msgSize = 0;
try {
if (aborting) {
return;
}
if (aSerializedCAS != null) {
msgSize = aSerializedCAS.length;
}
CacheEntry entry = this.getCacheEntry(aCasReferenceId);
if (entry == null) {
throw new AsynchAEException("Controller:"
+ getAnalysisEngineController().getComponentName()
+ " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
+ " CAS:" + aCasReferenceId + " Not In The Cache");
}
// Get the connection object for a given endpoint
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
if (endpointConnection == null) {
throw new AsynchAEException("Controller:"
+ getAnalysisEngineController().getComponentName()
+ " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
+ " Connection is Invalid. InputCasReferenceId:" + anInputCasReferenceId
+ " CasReferenceId:" + aCasReferenceId + " Sequece:" + sequence);
}
if (!endpointConnection.isOpen()) {
if (!isRequest) {
return;
}
}
BytesMessage tm = null;
try {
// Create empty JMS Text Message
tm = endpointConnection.produceByteMessage();
} catch (AsynchAEException ex) {
System.out.println("UIMA AS Service:" + getAnalysisEngineController().getComponentName()
+ " Unable to Send Reply Message To Remote Endpoint: "
+ anEndpoint.getDestination() + ". Broker:" + anEndpoint.getServerURI()
+ " is Unavailable. InputCasReferenceId:" + anInputCasReferenceId
+ " CasReferenceId:" + aCasReferenceId + " Sequece:" + sequence);
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"sendCasToRemoteDelegate",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getEndpoint() });
return;
}
tm.writeBytes(aSerializedCAS);
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
// Add Cas Reference Id to the outgoing JMS Header
tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
// Add common properties to the JMS Header
if (isRequest == true) {
populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
} else {
populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
}
// The following is true when the analytic is a CAS Multiplier
if (sequence > 0 && !isRequest) {
// Override MessageType set in the populateHeaderWithContext above.
// Make the reply message look like a request. This message will contain a new CAS
// produced by the CAS Multiplier. The client will treat this CAS
// differently from the input CAS.
tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
// Add a sequence number assigned to this CAS by the controller
tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
isRequest = true;
if (freeCASTempQueue != null) {
// Attach a temp queue to the outgoing message. This a queue where
// Free CAS notifications need to be sent from the client
tm.setJMSReplyTo(freeCASTempQueue);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
if (entry != null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendCasToRemoteEndpoint",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_send_cas_to_collocated_service_detail__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId,
entry.getInputCasReferenceId() });
}
}
}
dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
}
catch (ServiceShutdownException e) {
throw e;
} catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private void sendCasToRemoteEndpoint(boolean isRequest, String aSerializedCAS, CacheEntry entry,
Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
ServiceShutdownException {
CasStateEntry casStateEntry = null;
long msgSize = 0;
try {
if (aborting) {
return;
}
casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
entry.getCasReferenceId());
// Get the connection object for a given endpoint
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
if (!endpointConnection.isOpen()) {
if (!isRequest) {
return;
}
}
// Create empty JMS Text Message
TextMessage tm = null;
try {
// Create empty JMS Text Message
tm = endpointConnection.produceTextMessage("");
} catch (AsynchAEException ex) {
System.out.println("UIMA AS Service:" + getAnalysisEngineController().getComponentName()
+ " Unable to Send Reply Message To Remote Endpoint: "
+ anEndpoint.getDestination() + ". Broker:" + anEndpoint.getServerURI()
+ " is Unavailable. CasReferenceId:" + casStateEntry.getCasReferenceId());
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"sendCasToRemoteDelegate",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getEndpoint() });
return;
}
// Save Serialized CAS in case we need to re-send it for analysis
if (anEndpoint.isRetryEnabled()
&& getAnalysisEngineController().getInProcessCache().getSerializedCAS(
entry.getCasReferenceId()) == null) {
getAnalysisEngineController().getInProcessCache().saveSerializedCAS(
entry.getCasReferenceId(), aSerializedCAS);
}
if (aSerializedCAS != null) {
msgSize = aSerializedCAS.length();
}
tm.setText(aSerializedCAS);
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
// Add Cas Reference Id to the outgoing JMS Header
tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
// Add common properties to the JMS Header
if (isRequest == true) {
populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
} else {
populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
}
// The following is true when the analytic is a CAS Multiplier
if (casStateEntry.isSubordinate() && !isRequest) {
// Override MessageType set in the populateHeaderWithContext above.
// Make the reply message look like a request. This message will contain a new CAS
// produced by the CAS Multiplier. The client will treat this CAS
// differently from the input CAS.
tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
isRequest = true;
// Save the id of the parent CAS
tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
.getCasReferenceId()));
// Add a sequence number assigned to this CAS by the controller
tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
// If this is a Cas Multiplier, add a reference to a special queue where
// the client sends Free Cas Notifications
if (freeCASTempQueue != null) {
// Attach a temp queue to the outgoing message. This is a queue where
// Free CAS notifications need to be sent from the client
tm.setJMSReplyTo(freeCASTempQueue);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendCasToRemoteEndpoint",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_send_cas_to_collocated_service_detail__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
anEndpoint.getEndpoint(), entry.getCasReferenceId(),
entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
}
}
dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
}
catch (ServiceShutdownException e) {
throw e;
} catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private void sendCasToRemoteEndpoint(boolean isRequest, byte[] aSerializedCAS, CacheEntry entry,
Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
ServiceShutdownException {
CasStateEntry casStateEntry = null;
long msgSize = 0;
try {
if (aborting) {
return;
}
casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
entry.getCasReferenceId());
// Get the connection object for a given endpoint
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
if (aSerializedCAS != null) {
msgSize = aSerializedCAS.length;
}
if (!endpointConnection.isOpen()) {
if (!isRequest) {
return;
}
}
// Create empty JMS Text Message
BytesMessage tm = null;
try {
// Create empty JMS Text Message
tm = endpointConnection.produceByteMessage();
} catch (AsynchAEException ex) {
System.out.println("UIMA AS Service:" + getAnalysisEngineController().getComponentName()
+ " Unable to Send Reply Message To Remote Endpoint: "
+ anEndpoint.getDestination() + ". Broker:" + anEndpoint.getServerURI()
+ " is Unavailable. CasReferenceId:" + casStateEntry.getCasReferenceId());
return;
}
tm.writeBytes(aSerializedCAS);
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
// Add Cas Reference Id to the outgoing JMS Header
tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
// Add common properties to the JMS Header
if (isRequest == true) {
populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
} else {
populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
}
if (casStateEntry == null) {
return;
}
// The following is true when the analytic is a CAS Multiplier
if (casStateEntry.isSubordinate() && !isRequest) {
// Override MessageType set in the populateHeaderWithContext above.
// Make the reply message look like a request. This message will contain a new CAS
// produced by the CAS Multiplier. The client will treat this CAS
// differently from the input CAS.
tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
isRequest = true;
// Save the id of the parent CAS
tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
.getCasReferenceId()));
// Add a sequence number assigned to this CAS by the controller
tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
// If this is a Cas Multiplier, add a reference to a special queue where
// the client sends Free Cas Notifications
if (freeCASTempQueue != null) {
// Attach a temp queue to the outgoing message. This is a queue where
// Free CAS notifications need to be sent from the client
tm.setJMSReplyTo(freeCASTempQueue);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendCasToRemoteEndpoint",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_send_cas_to_collocated_service_detail__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
anEndpoint.getEndpoint(), entry.getCasReferenceId(),
entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
}
}
dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
} catch (ServiceShutdownException e) {
throw e;
} catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private Delegate lookupDelegate(String aDelegateKey) {
if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
Delegate delegate = ((AggregateAnalysisEngineController) getAnalysisEngineController())
.lookupDelegate(aDelegateKey);
return delegate;
}
return null;
}
private void addCasToOutstandingList(CacheEntry entry, boolean isRequest, String aDelegateKey) {
Delegate delegate = null;
if (isRequest && (delegate = lookupDelegate(aDelegateKey)) != null) {
delegate.addCasToOutstandingList(entry.getCasReferenceId());
}
}
private void removeCasFromOutstandingList(CacheEntry entry, boolean isRequest, String aDelegateKey) {
Delegate delegate = null;
if (isRequest && (delegate = lookupDelegate(aDelegateKey)) != null) {
delegate.removeCasFromOutstandingList(entry.getCasReferenceId());
}
}
private String getTopParentCasReferenceId(String casReferenceId) throws Exception {
if (!getAnalysisEngineController().getLocalCache().containsKey(casReferenceId)) {
return null;
}
CasStateEntry casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
casReferenceId);
if (casStateEntry.isSubordinate()) {
// Recurse until the top CAS reference Id is found
return getTopParentCasReferenceId(casStateEntry.getInputCasReferenceId());
}
// Return the top ancestor CAS id
return casStateEntry.getCasReferenceId();
}
private void addIdleTime(Message aMessage) {
long t = System.nanoTime();
getAnalysisEngineController().saveReplyTime(t, "");
}
private CacheEntry getCacheEntry(String aCasReferenceId) throws Exception {
CacheEntry cacheEntry = null;
if (getAnalysisEngineController().getInProcessCache().entryExists(aCasReferenceId)) {
cacheEntry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
aCasReferenceId);
}
return cacheEntry;
}
public void stop() {
stop(Channel.CloseAllChannels);
}
public void stop(int channelsToClose) {
aborting = true;
try {
// Fetch iterator over all Broker Connections. This service may be connected
// to many brokers. Each broker connection may handle multiple sessions to
// different reply queues
Iterator it = connectionMap.keySet().iterator();
JmsEndpointConnection_impl endpointConnection = null;
// iterate over connections
while (it.hasNext()) {
// The key is the broker URL
String key = (String) it.next();
// Fetch a connection object for a given URL
Object value = connectionMap.get(key);
if (value instanceof BrokerConnectionEntry) {
BrokerConnectionEntry brokerConnectionEntry = (BrokerConnectionEntry) value;
// A connection object may have many endpoint objects. There is a separate
// endpoint object per reply queue.
Iterator replyEndpointIterator = brokerConnectionEntry.endpointMap.keySet().iterator();
// Iterate over endpoints, each representing a reply queue
while (replyEndpointIterator.hasNext()) {
// Get endpoint object for a reply queue. The abort() call below
// just closes a session and a producer. The JMS Connection is closed
// outside of this while-loop when we clean up all the sessions.
endpointConnection = brokerConnectionEntry.endpointMap
.get(replyEndpointIterator.next());
// Close the session and the producer
endpointConnection.abort();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_forced_endpoint_close__INFO",
new Object[] { getAnalysisEngineController().getName(),
endpointConnection.getEndpoint(), endpointConnection.getServerUri() });
}
}
// Cancel any pending timers and finally close the JMS Connection to the
// broker
if (brokerConnectionEntry != null) {
if (brokerConnectionEntry.getConnectionTimer() != null) {
brokerConnectionEntry.getConnectionTimer().cancelTimer();
}
if (brokerConnectionEntry.getConnection() != null) {
try {
brokerConnectionEntry.getConnection().close();
System.out.println("JMS Connection to Broker: " + key + " Closed");
} catch (Exception ex) { /* ignore, we are stopping */
}
}
}
}
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_output_channel_aborted__INFO",
new Object[] { getAnalysisEngineController().getName() });
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
}
private String extractURLWithProtocol(String aProtocolList, String aProtocol) {
StringTokenizer tokenizer = new StringTokenizer(aProtocolList, ",");
while (tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken().trim();
if (token.toLowerCase().startsWith(aProtocol.toLowerCase())) {
return token;
}
}
return null;
}
public void cancelTimers() {
if (connectionMap.size() > 0) {
Iterator<String> it = connectionMap.keySet().iterator();
while (it.hasNext()) {
String key = it.next();
BrokerConnectionEntry ce = (BrokerConnectionEntry) connectionMap.get(key);
if (ce != null && ce.getConnectionTimer() != null) {
ce.getConnectionTimer().cancelTimer();
}
}
}
}
public static class BrokerConnectionEntry {
private String brokerURL;
private Connection connection;
private ConnectionTimer connectionTimer;
Map<Object, JmsEndpointConnection_impl> endpointMap = new ConcurrentHashMap<Object, JmsEndpointConnection_impl>();
public String getBrokerURL() {
return brokerURL;
}
public void setConnectionTimer(ConnectionTimer aConnectionTimer) {
connectionTimer = aConnectionTimer;
}
public ConnectionTimer getConnectionTimer() {
return connectionTimer;
}
public void setBrokerURL(String brokerURL) {
this.brokerURL = brokerURL;
}
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public void addEndpointConnection(Object key, JmsEndpointConnection_impl endpointConnection) {
endpointMap.put(key, endpointConnection);
}
public JmsEndpointConnection_impl getEndpointConnection(Object key) {
return endpointMap.get(key);
}
public boolean endpointExists(Object key) {
return endpointMap.containsKey(key);
}
public void removeEndpoint(Object key) {
endpointMap.remove(key);
}
}
protected class ConnectionTimer {
private final Class CLASS_NAME = ConnectionTimer.class;
private Timer timer;
private long inactivityTimeout;
private AnalysisEngineController controller;
private BrokerConnectionEntry brokerDestinations;
private long connectionCreationTimestamp;
private String componentName = "";
public ConnectionTimer(BrokerConnectionEntry aBrokerDestinations) {
brokerDestinations = aBrokerDestinations;
}
public void setInactivityTimeout(long anInactivityTimeout) {
inactivityTimeout = anInactivityTimeout;
}
public void setAnalysisEngineController(AnalysisEngineController aController) {
controller = aController;
if (controller != null) {
componentName = controller.getComponentName();
}
}
public void setConnectionCreationTimestamp(long aConnectionCreationTimestamp) {
connectionCreationTimestamp = aConnectionCreationTimestamp;
}
public void startTimer(long aConnectionCreationTimestamp, final Endpoint endpoint) {
startTimer(aConnectionCreationTimestamp, endpoint, inactivityTimeout, componentName);
}
public synchronized void startTimer(long aConnectionCreationTimestamp, final Endpoint endpoint,
long currentInactivityTimeout, String aComponentName) {
final long cachedConnectionCreationTimestamp = aConnectionCreationTimestamp;
Date timeToRun = new Date(System.currentTimeMillis() + currentInactivityTimeout);
if (timer != null) {
timer.cancel();
}
if (controller != null) {
timer = new Timer("Controller:" + aComponentName + ":Reply TimerThread-:" + endpoint + ":"
+ System.nanoTime());
} else {
timer = new Timer("Reply TimerThread-:" + endpoint + ":" + System.nanoTime());
}
timer.schedule(new TimerTask() {
public void run() {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"startTimer",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_inactivity_timer_expired__INFO",
new Object[] { Thread.currentThread().getName(), componentName,
inactivityTimeout, endpoint });
}
if (connectionCreationTimestamp <= cachedConnectionCreationTimestamp) {
try {
if (brokerDestinations.getConnection() != null
&& !((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()) {
try {
brokerDestinations.getConnection().stop();
brokerDestinations.getConnection().close();
brokerDestinations.setConnection(null);
} catch (Exception e) {
// Ignore this for now. Attempting to close connection that has been closed
// Ignore we are shutting down
} finally {
for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerDestinations.endpointMap
.entrySet()) {
endpoints.getValue().close(); // close session and producer
}
brokerDestinations.endpointMap.clear();
connectionMap.remove(endpoint.getServerURI());
}
}
brokerDestinations.setConnection(null);
} catch (Exception e) {
} finally {
removeDestinationFromManagedList(brokerDestinations, endpoint);
}
}
cancelTimer();
}
}, timeToRun);
}
private void removeDestinationFromManagedList(BrokerConnectionEntry brokerDestinations,
Endpoint endpoint) {
String key = endpoint.getEndpoint() + endpoint.getServerURI();
String destination = endpoint.getEndpoint();
if (endpoint.getDestination() != null
&& endpoint.getDestination() instanceof ActiveMQDestination) {
destination = ((ActiveMQDestination) endpoint.getDestination()).getPhysicalName();
key = destination;
}
if (brokerDestinations.endpointExists(key)) {
brokerDestinations.removeEndpoint(key);
}
}
private void cancelTimer() {
if (timer != null) {
timer.cancel();
timer.purge();
}
}
public synchronized void stopTimer() {
cancelTimer();
timer = null;
}
}
private static class RecoveryThread implements Runnable {
Endpoint endpoint;
CacheEntry entry;
boolean isRequest;
AnalysisEngineController controller;
JmsOutputChannel outputChannel;
public RecoveryThread(JmsOutputChannel channel, Endpoint anEndpoint, CacheEntry anEntry,
boolean isRequest, AnalysisEngineController aController) {
endpoint = anEndpoint;
entry = anEntry;
controller = aController;
this.isRequest = isRequest;
outputChannel = channel;
}
public void run() {
Delegate delegate = outputChannel.lookupDelegate(endpoint.getDelegateKey());
// Removes the failed CAS from the list of CASes pending reply. This also
// cancels the timer if this CAS was the oldest pending CAS, and if there
// are other CASes pending a fresh timer is started.
outputChannel.removeCasFromOutstandingList(entry, isRequest, endpoint.getDelegateKey());
if (delegate != null) {
// Mark this delegate as Failed
delegate.getEndpoint().setStatus(Endpoint.FAILED);
// Destroy listener associated with a reply queue for this delegate
InputChannel ic = controller.getInputChannel(delegate.getEndpoint().getDestination()
.toString());
if (ic != null && delegate != null && delegate.getEndpoint() != null) {
ic.destroyListener(delegate.getEndpoint().getDestination().toString(), endpoint
.getDelegateKey());
}
}
// Setup error context and handle failure in the error handler
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, entry.getCasReferenceId());
errorContext.add(AsynchAEMessage.Endpoint, endpoint);
errorContext.handleSilently(true); // dont dump exception to the log
// Failure on send treat as timeout
delegate.handleError(new MessageTimeoutException(), errorContext);
}
}
}