blob: 04721131e687d8f30d267c212e00f826310c03dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.aae.handler.input;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaSerializer;
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.Endpoint_impl;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.InvalidMessageException;
import org.apache.uima.aae.handler.HandlerBase;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.monitor.statistics.DelegateStats;
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
import org.apache.uima.aae.monitor.statistics.TimerStats;
import org.apache.uima.analysis_engine.asb.impl.FlowContainer;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.Marker;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
import org.apache.uima.util.Level;
public class ProcessRequestHandler_impl extends HandlerBase {
private static final Class CLASS_NAME = ProcessRequestHandler_impl.class;
private Object mux = new Object();
private UimaSerializer uimaSerializer = new UimaSerializer();
public ProcessRequestHandler_impl(String aName) {
super(aName);
}
private void cacheStats(String aCasReferenceId, long aTimeWaitingForCAS,
long aTimeToDeserializeCAS) throws Exception {
CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
entry.incrementTimeWaitingForCAS(aTimeWaitingForCAS);
entry.incrementTimeToDeserializeCAS(aTimeToDeserializeCAS);
}
private boolean messageContainsXMI(MessageContext aMessageContext, String casReferenceId)
throws Exception {
// Fetch serialized CAS from the message
String xmi = aMessageContext.getStringMessage();
// *****************************************************************
// ***** NO XMI In Message. Kick this back to sender with exception
// *****************************************************************
if (xmi == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_message_has_no_cargo__INFO",
new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
CasStateEntry stateEntry = null;
String parentCasReferenceId = null;
try {
stateEntry = getController().getLocalCache().lookupEntry(casReferenceId);
if (stateEntry != null && stateEntry.isSubordinate()) {
CasStateEntry topParentEntry = getController().getLocalCache().getTopCasAncestor(
casReferenceId);
parentCasReferenceId = topParentEntry.getCasReferenceId();
}
} catch (Exception e) {
}
getController().getOutputChannel().sendReply(
new InvalidMessageException("No XMI data in message"), casReferenceId,
parentCasReferenceId, aMessageContext.getEndpoint(), AsynchAEMessage.Process);
// Dont process this empty message
return false;
}
return true;
}
private synchronized CAS getCAS(boolean fetchCASFromShadowCasPool, String shadowCasPoolKey,
String casReceivedFrom) {
CAS cas = null;
// If this is a new CAS (generated by a CM), fetch a CAS from a Shadow Cas Pool associated with
// a CM that
// produced the CAS. Each CM will have its own Shadow Cas Pool
if (fetchCASFromShadowCasPool) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_cm__FINE",
new Object[] { shadowCasPoolKey });
}
// Aggregate time spent waiting for a CAS in the shadow cas pool
((AggregateAnalysisEngineController) getController()).getDelegateServicePerformance(
shadowCasPoolKey).beginWaitOnShadowCASPool();
cas = getController().getCasManagerWrapper().getNewCas(shadowCasPoolKey);
((AggregateAnalysisEngineController) getController()).getDelegateServicePerformance(
shadowCasPoolKey).endWaitOnShadowCASPool();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "getCAS",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted_cm__FINE",
new Object[] { shadowCasPoolKey });
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas__FINE",
new Object[] { casReceivedFrom });
}
// Aggregate time spent waiting for a CAS in the service cas pool
getController().getServicePerformance().beginWaitOnCASPool();
cas = getController().getCasManagerWrapper().getNewCas();
getController().getServicePerformance().endWaitOnCASPool();
ServicePerformance sp = getController().getServicePerformance();
sp.incrementCasPoolWaitTime(sp.getTimeWaitingForCAS());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "getCAS",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
new Object[] { casReceivedFrom });
}
}
return cas;
}
/**
*
* @param casReferenceId
* @param freeCasEndpoint
* @param shadowCasPoolKey
* @param aMessageContext
* @return
* @throws Exception
*/
private CacheEntry deserializeCASandRegisterWithCache(String casReferenceId,
Endpoint freeCasEndpoint, String shadowCasPoolKey, MessageContext aMessageContext)
throws Exception {
long inTime = System.nanoTime();
boolean casRegistered = false;
// Time how long we wait on Cas Pool to fetch a new CAS
long t1 = getController().getCpuTime();
// *************************************************************************
// Fetch CAS from a Cas Pool. If the CAS came from a Cas Multiplier
// fetch the CAS from a shadow CAS pool. Otherwise, fetch the CAS
// from the service CAS Pool.
// *************************************************************************
Endpoint endpoint = aMessageContext.getEndpoint();
CAS cas = getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), shadowCasPoolKey,
endpoint.getEndpoint());
long timeWaitingForCAS = getController().getCpuTime() - t1;
// Check if we are still running
if (getController().isStopped()) {
// The Controller is in shutdown state.
getController().dropCAS(cas);
return null;
}
// *************************************************************************
// Deserialize CAS from the message
// *************************************************************************
t1 = getController().getCpuTime();
String serializationStrategy = endpoint.getSerializer();
XmiSerializationSharedData deserSharedData = null;
CacheEntry entry = null;
if (serializationStrategy.equals("xmi")) {
// Fetch serialized CAS from the message
String xmi = aMessageContext.getStringMessage();
deserSharedData = new XmiSerializationSharedData();
// UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
} else if (serializationStrategy.equals("binary")) {
// *************************************************************************
// Register the CAS with a local cache
// *************************************************************************
// CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext,
// deserSharedData, casReferenceId);
byte[] binarySource = aMessageContext.getByteMessage();
// UimaSerializer.deserializeCasFromBinary(binarySource, cas);
uimaSerializer.deserializeCasFromBinary(binarySource, cas);
}
// *************************************************************************
// Check and set up for Delta CAS reply
// *************************************************************************
boolean acceptsDeltaCas = false;
Marker marker = null;
if (aMessageContext.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
acceptsDeltaCas = aMessageContext.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
if (acceptsDeltaCas) {
marker = cas.createMarker();
}
}
// *************************************************************************
// Register the CAS with a local cache
// *************************************************************************
// CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext,
// deserSharedData, casReferenceId);
entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData,
casReferenceId, marker, acceptsDeltaCas);
long timeToDeserializeCAS = getController().getCpuTime() - t1;
getController().incrementDeserializationTime(timeToDeserializeCAS);
LongNumericStatistic statistic;
if ((statistic = getController().getMonitor().getLongNumericStatistic("",
Monitor.TotalDeserializeTime)) != null) {
statistic.increment(timeToDeserializeCAS);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_deserialize_cas_time_FINE",
new Object[] { (double) timeToDeserializeCAS / 1000000.0 });
}
// Update Stats
ServicePerformance casStats = getController().getCasStatistics(casReferenceId);
casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
if (getController().isTopLevelComponent()) {
synchronized (mux) {
getController().getServicePerformance().incrementCasDeserializationTime(
timeToDeserializeCAS);
}
}
getController().saveTime(inTime, casReferenceId, getController().getName());
if (getController() instanceof AggregateAnalysisEngineController) {
// If the message came from a Cas Multiplier, associate the input/parent CAS id with this CAS
if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
// Fetch parent CAS id
String inputCasReferenceId = aMessageContext
.getMessageStringProperty(AsynchAEMessage.InputCasReference);
if (shadowCasPoolKey != null) {
// Save the key of the Cas Multiplier in the cache. It will be now known which Cas
// Multiplier produced this CAS
entry.setCasProducerKey(shadowCasPoolKey);
}
// associate this subordinate CAS with the parent CAS
entry.setInputCasReferenceId(inputCasReferenceId);
// Save a Cas Multiplier endpoint where a Free CAS notification will be sent
entry.setFreeCasEndpoint(freeCasEndpoint);
cacheStats(inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
} else {
cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
}
DelegateStats stats = new DelegateStats();
if (entry.getStat() == null) {
entry.setStat(stats);
// Add entry for self (this aggregate). MessageContext.getEndpointName()
// returns the name of the queue receiving the message.
stats.put(getController().getServiceEndpointName(), new TimerStats());
} else {
if (!stats.containsKey(getController().getServiceEndpointName())) {
stats.put(getController().getServiceEndpointName(), new DelegateStats());
}
}
} else {
cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_deserialized_cas_ready_to_process_FINE",
new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
cacheProcessCommandInClientEndpoint();
return entry;
}
private String getCasReferenceId(MessageContext aMessageContext) throws Exception {
if (!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_message_has_cas_refid__INFO",
new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
getController().getOutputChannel().sendReply(
new InvalidMessageException("No Cas Reference Id Received From Delegate In message"),
null, null, aMessageContext.getEndpoint(), AsynchAEMessage.Process);
return null;
}
return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
}
/**
* Handles process request from a remote client
*
* @param aMessageContext
* - contains a message from UIMA-AS Client
* @throws AsynchAEException
*/
private void handleProcessRequestFromRemoteClient(MessageContext aMessageContext)
throws AsynchAEException {
CacheEntry entry = null;
String casReferenceId = null;
// Check if there is a cargo in the message
if (aMessageContext.getMessageIntProperty(AsynchAEMessage.Payload) == AsynchAEMessage.XMIPayload
&& aMessageContext.getStringMessage() == null) {
return; // No XMI just return
}
try {
String newCASProducedBy = null;
// Get the CAS Reference Id of the input CAS
// Fetch id of the CAS from the message. If it doesnt exist the method will create an entry in
// the log file and return null
casReferenceId = getCasReferenceId(aMessageContext);
if (casReferenceId == null) {
return; // Invalid message. Nothing to do
}
// Initially make both equal
String inputCasReferenceId = casReferenceId;
// Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier
Endpoint freeCasEndpoint = null;
CasStateEntry inputCasStateEntry = null;
// CASes generated by a Cas Multiplier will have a CasSequence property set.
if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
// Fetch the name of the Cas Multiplier's input queue
// String cmEndpointName = aMessageContext.getEndpoint().getEndpoint();
String cmEndpointName = aMessageContext
.getMessageStringProperty(AsynchAEMessage.MessageFrom);
newCASProducedBy = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(cmEndpointName);
// Fetch an ID of the parent CAS
inputCasReferenceId = aMessageContext
.getMessageStringProperty(AsynchAEMessage.InputCasReference);
// Fetch Cache entry for the parent CAS
CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(
inputCasReferenceId);
// Fetch an endpoint where Free CAS Notification must be sent.
// This endpoint is unique per CM instance. Meaning, each
// instance of CM will have an endpoint where it expects Free CAS
// notifications.
freeCasEndpoint = aMessageContext.getEndpoint();
// Clone an endpoint where Free Cas Request will be sent
freeCasEndpoint = (Endpoint) ((Endpoint_impl) freeCasEndpoint).clone();
if (getController() instanceof AggregateAnalysisEngineController) {
inputCasStateEntry = ((AggregateAnalysisEngineController) getController())
.getLocalCache().lookupEntry(inputCasReferenceId);
// Associate Free Cas Notification Endpoint with an input Cas
inputCasStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
}
computeStats(aMessageContext, inputCasReferenceId);
// Reset the destination
aMessageContext.getEndpoint().setDestination(null);
// This CAS came in from a CAS Multiplier. Treat it differently than the
// input CAS. In case the Aggregate needs to send this CAS to the
// client, retrieve the client destination by looking up the client endpoint
// using input CAS reference id. CASes generated by the CAS multiplier will have
// the same Cas Reference id.
Endpoint replyToEndpoint = inputCasCacheEntry.getMessageOrigin();
// The message context contains a Cas Multiplier endpoint. Since
// we dont want to send a generated CAS back to the CM, override
// with an endpoint provided by the client of
// this service. Client endpoint is attached to an input Cas cache entry.
aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
// Before sending a CAS to Cas Multiplier, the aggregate has
// saved the CM key in the CAS cache entry. Fetch the key
// of the CM so that we can ask the right Shadow Cas Pool for
// a new CAS. Every Shadow Cas Pool has a unique id which
// corresponds to a Cas Multiplier key.
// newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey();
if (getController() instanceof AggregateAnalysisEngineController) {
Endpoint casMultiplierEndpoint = ((AggregateAnalysisEngineController) getController())
.lookUpEndpoint(newCASProducedBy, false);
if (casMultiplierEndpoint != null) {
// Save the URL of the broker managing the Free Cas Notification queue.
// This is needed when we try to establish a connection to the broker.
freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI());
}
}
} else if (getController().isTopLevelComponent()) {
if (getController() instanceof AggregateAnalysisEngineController) {
((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId,
aMessageContext.getEndpoint());
}
if (getController().isCasMultiplier()) {
// Send an ack to the client. The ack message will include a FreeCasQueue
// to enable the client to send messages to the service processing a CAS.
try {
getController().getOutputChannel().sendReply(AsynchAEMessage.ServiceInfo,
aMessageContext.getEndpoint(), casReferenceId);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
}
}
// To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
// to see if the message with a given CasReferenceId is already being processed. It is, the
// message contains the same request possibly issued by the caller due to timeout. Also this
// mechanism helps with dealing with scenario when this service is not up when the client
// sends
// request. The client can keep re-sending the same request until its timeout thresholds are
// exceeded. By that time, there may be multiple messages in this service queue with the same
// CasReferenceId. When the service finally comes back up, it will have multiple messages in
// its queue possibly from the same client. Only the first message for any given
// CasReferenceId
// should be processed.
if (!getController().getInProcessCache().entryExists(casReferenceId)) {
CasStateEntry cse = null;
if (getController().getLocalCache().lookupEntry(casReferenceId) == null) {
// Create a new entry in the local cache for the CAS received from the remote
cse = getController().getLocalCache().createCasStateEntry(casReferenceId);
// Check if this CAS is a child
if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
cse.setInputCasReferenceId(inputCasReferenceId);
}
} else {
cse = getController().getLocalCache().lookupEntry(casReferenceId);
}
if (getController() instanceof AggregateAnalysisEngineController
&& aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
String delegateInputQueueName = aMessageContext
.getMessageStringProperty(AsynchAEMessage.MessageFrom);
String delegateKey = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(delegateInputQueueName); // aMessageContext.getEndpoint().getEndpoint());
if (delegateKey != null) {
Delegate delegate = ((AggregateAnalysisEngineController) getController())
.lookupDelegate(delegateKey);
// Save the last delegate handling this CAS
cse.setLastDelegate(delegate);
// If there is one thread receiving messages from Cas Multiplier increment number of
// child Cases
// of the parent CAS. If there are more threads (consumers) a special object
// ConcurrentMessageListener
// has already incremented the count. This special object enforces order of processing
// for CASes
// coming in from the Cas Multiplier.
if (!delegate.hasConcurrentConsumersOnReplyQueue()) {
inputCasStateEntry.incrementSubordinateCasInPlayCount();
}
}
}
entry = deserializeCASandRegisterWithCache(casReferenceId, freeCasEndpoint,
newCASProducedBy, aMessageContext);
if (getController().isStopped() || entry == null || entry.getCas() == null) {
if (entry != null) {
// The Controller is in shutdown state, release the CAS
getController().dropCAS(entry.getCasReferenceId(), true);
entry = null;
}
return;
}
// *****************************************************************
// Process the CAS
// *****************************************************************
invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext,
newCASProducedBy);
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_duplicate_request__INFO", new Object[] { casReferenceId });
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
if (entry != null) {
getController().dropCAS(entry.getCas());
}
getController().getErrorHandlerChain().handle(e, errorContext, getController());
}
}
private void handleProcessRequestWithCASReference(MessageContext aMessageContext)
throws AsynchAEException {
boolean isNewCAS = false;
String newCASProducedBy = null;
try {
// This is only used when handling CASes produced by CAS Multiplier
String inputCasReferenceId = null;
CAS cas = null;
CasStateEntry cse = null;
String casReferenceId = getCasReferenceId(aMessageContext);
if ((cse = getController().getLocalCache().lookupEntry(casReferenceId)) == null) {
// Create a new entry in the local cache for the CAS received from the remote
cse = getController().getLocalCache().createCasStateEntry(casReferenceId);
}
// Check if this Cas has been sent from a Cas Multiplier. If so, its sequence will be > 0
if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
isNewCAS = true;
Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint();
if (casMultiplierEndpoint == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
new Object[] { casReferenceId });
}
return;
}
// Get the id of the parent Cas
inputCasReferenceId = aMessageContext
.getMessageStringProperty(AsynchAEMessage.InputCasReference);
if (cse.getInputCasReferenceId() == null) {
cse.setInputCasReferenceId(inputCasReferenceId);
}
if (getController() instanceof AggregateAnalysisEngineController) {
CasStateEntry parentCasEntry = getController().getLocalCache().lookupEntry(
inputCasReferenceId);
// Check if the parent CAS is in a failed state first
if (parentCasEntry != null && parentCasEntry.isFailed()) {
// handle CAS release
getController().process(null, casReferenceId);
return;
}
String delegateKey = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
Delegate delegate = ((AggregateAnalysisEngineController) getController())
.lookupDelegate(delegateKey);
cse.setLastDelegate(delegate);
newCASProducedBy = delegate.getKey();
casMultiplierEndpoint.setIsCasMultiplier(true);
try {
// Save the endpoint of the CM which produced the Cas
getController().getInProcessCache().setCasProducer(casReferenceId, newCASProducedBy);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
new Object[] { e });
}
return;
}
// Safety check. The input Cas should not be null here
if (inputCasReferenceId != null) {
try {
Endpoint endp = null;
// Located the origin of the parent Cas. The produced Cas will inherit the origin from
// its parent.
// Once the origin is identified, save the origin using the produced Cas id as a key.
if (endp == null) {
boolean gotTheEndpoint = false;
String parentCasId = inputCasReferenceId;
// Loop through the parent tree until an origin is found
while (!gotTheEndpoint) {
// Check if the current parent has an associated origin
endp = ((AggregateAnalysisEngineController) getController())
.getMessageOrigin(parentCasId);
// Check if there is an origin. If so, we are done
if (endp != null) {
break;
}
// The current parent has no origin, get its parent and try again
CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(
parentCasId);
parentCasId = entry.getInputCasReferenceId();
// Check if we reached the top of the hierarchy tree. If so, we have no origin.
// This should
// never be the case. Every Cas must have an origin
if (parentCasId == null) {
break;
}
}
}
// If origin not found log it as this indicates an error
if (endp == null) {
System.out.println("Endpoint Not Found For Cas Id:" + inputCasReferenceId);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_msg_origin_not_found__INFO",
new Object[] { getController().getComponentName(), inputCasReferenceId });
}
} else {
((AggregateAnalysisEngineController) getController()).addMessageOrigin(
casReferenceId, endp);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_msg_origin_added__FINEST",
new Object[] { getController().getComponentName(), casReferenceId,
newCASProducedBy });
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
}
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_input_cas_invalid__INFO",
new Object[] { getController().getComponentName(), newCASProducedBy,
casReferenceId });
}
}
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_new_cas__FINE", new Object[] { casReferenceId, newCASProducedBy });
}
aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
} else {
if (getController() instanceof AggregateAnalysisEngineController) {
((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId,
aMessageContext.getEndpoint());
}
}
cas = getController().getInProcessCache().getCasByReference(casReferenceId);
long arrivalTime = System.nanoTime();
getController().saveTime(arrivalTime, casReferenceId, getController().getName());// aMessageContext.getEndpointName());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_analyzing_cas__FINE", new Object[] { casReferenceId });
}
// Save Process command in the client endpoint.
cacheProcessCommandInClientEndpoint();
if (getController().isStopped()) {
return;
}
if (isNewCAS) {
invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
} else {
invokeProcess(cas, casReferenceId, null, aMessageContext, newCASProducedBy);
}
} catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private void handleProcessRequestWithXCAS(MessageContext aMessageContext)
throws AsynchAEException {
try {
// Get the CAS Reference Id of the input CAS
String casReferenceId = getCasReferenceId(aMessageContext);
String inputCasReferenceId = casReferenceId;
// This is only used when handling CASes produced by CAS Multiplier
String newCASProducedBy = null;
if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
// This CAS came in from the CAS Multiplier. Treat it differently than the
// input CAS. First, in case the Aggregate needs to send this CAS to the
// client, retrieve the client destination by looking up the client endpoint
// using input CAS reference id. CASes generated by the CAS multiplier will have
// the same Cas Reference id.
Endpoint replyToEndpoint = getController().getInProcessCache().getCacheEntryForCAS(
casReferenceId).getMessageOrigin();
//
if (getController() instanceof AggregateAnalysisEngineController) {
newCASProducedBy = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(replyToEndpoint.getEndpoint());
}
// MessageContext contains endpoint set by the CAS Multiplier service. Overwrite
// this with the endpoint of the client who sent the input CAS. In case this
// aggregate is configured to send new CASes to the client we know where to send them.
aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
inputCasReferenceId = String.valueOf(casReferenceId);
// Set this to null so that the new CAS gets its own Cas Reference Id below
casReferenceId = null;
}
long arrivalTime = System.nanoTime();
getController().saveTime(arrivalTime, casReferenceId, getController().getName());// aMessageContext.getEndpointName());
// To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
// to see if the message with a given CasReferenceId is already being processed. It is, the
// message contains the same request possibly issued by the caller due to timeout. Also this
// mechanism helps with dealing with scenario when this service is not up when the client
// sends
// request. The client can keep re-sending the same request until its timeout thresholds are
// exceeded. By that time, there may be multiple messages in this service queue with the same
// CasReferenceId. When the service finally comes back up, it will have multiple messages in
// its queue possibly from the same client. Only the first message for any given
// CasReferenceId
// should be processed.
if (casReferenceId == null
|| !getController().getInProcessCache().entryExists(casReferenceId)) {
String xmi = aMessageContext.getStringMessage();
// *****************************************************************
// ***** NO XMI In Message. Kick this back to sender with exception
// *****************************************************************
if (xmi == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_message_has_no_cargo__INFO",
new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
getController().getOutputChannel().sendReply(
new InvalidMessageException("No XMI data in message"), casReferenceId, null,
aMessageContext.getEndpoint(), AsynchAEMessage.Process);
// Dont process this empty message
return;
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_request_cas__FINE",
new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
CAS cas = getController().getCasManagerWrapper().getNewCas();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_request_cas_granted__FINE",
new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
// UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
if (casReferenceId == null) {
CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext,
deserSharedData);
casReferenceId = entry.getCasReferenceId();
} else {
if (getController() instanceof PrimitiveAnalysisEngineController) {
getController().getInProcessCache().register(cas, aMessageContext, deserSharedData,
casReferenceId);
}
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_deserialized_cas_ready_to_process_FINE",
new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
cacheProcessCommandInClientEndpoint();
invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_duplicate_request__INFO", new Object[] { casReferenceId });
}
}
} catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private void cacheProcessCommandInClientEndpoint() {
Endpoint clientEndpoint = getController().getClientEndpoint();
if (clientEndpoint != null) {
clientEndpoint.setCommand(AsynchAEMessage.Process);
}
}
private void handleCollectionProcessCompleteRequest(MessageContext aMessageContext)
throws AsynchAEException {
Endpoint replyToEndpoint = aMessageContext.getEndpoint();
getController().collectionProcessComplete(replyToEndpoint);
}
private void handleReleaseCASRequest(MessageContext aMessageContext) throws AsynchAEException {
String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleReleaseCASRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_release_cas_req__FINE",
new Object[] { getController().getName(), casReferenceId });
}
getController().releaseNextCas(casReferenceId);
}
private void handlePingRequest(MessageContext aMessageContext) {
try {
getController().getOutputChannel().sendReply(AsynchAEMessage.Ping,
aMessageContext.getEndpoint());
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handlePingRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
}
private void handleStopRequest(MessageContext aMessageContext) {
try {
String casReferenceId = aMessageContext
.getMessageStringProperty(AsynchAEMessage.CasReference);
System.out.println("###################Controller::" + getController().getComponentName()
+ " Received <<<STOP>>> Request For CAS:" + casReferenceId);
if (getController() instanceof PrimitiveAnalysisEngineController) {
getController().addAbortedCasReferenceId(casReferenceId);
} else if (getController() instanceof AggregateAnalysisEngineController_impl) {
try {
CasStateEntry casStateEntry = getController().getLocalCache().lookupEntry(casReferenceId);
// Mark the CAS as if it have failed. In this case we dont associate any
// exceptions with this CAS so its really not a failure of a CAS or any
// of its children. We simply use the same logic here as if the CAS failed.
// The Aggregate replyToClient() method will know that this CAS was stopped
// as opposed to failed by the fact that the CAS has no exceptions associated
// with it. In such case the replyToClient() method returns an input CAS as if
// it has been fully processed.
casStateEntry.setFailed();
((AggregateAnalysisEngineController_impl) getController()).stopCasMultipliers();
} catch (Exception ex) {
} // CAS may have already been deleted
}
} catch (Exception e) {
}
}
/**
* Main method called by the predecessor handler.
*
*
*/
public void handle(Object anObjectToHandle) // throws AsynchAEException
{
try {
super.validate(anObjectToHandle);
MessageContext messageContext = (MessageContext) anObjectToHandle;
if (isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.Process)
|| isHandlerForMessage(messageContext, AsynchAEMessage.Request,
AsynchAEMessage.CollectionProcessComplete)
|| isHandlerForMessage(messageContext, AsynchAEMessage.Request,
AsynchAEMessage.ReleaseCAS)
|| isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.Stop)) {
int payload = messageContext.getMessageIntProperty(AsynchAEMessage.Payload);
int command = messageContext.getMessageIntProperty(AsynchAEMessage.Command);
getController().getControllerLatch().waitUntilInitialized();
// If a Process Request, increment number of CASes processed
if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
&& command == AsynchAEMessage.Process
&& !messageContext.propertyExists(AsynchAEMessage.CasSequence)) {
// Increment number of CASes processed by this service
getController().getServicePerformance().incrementNumberOfCASesProcessed();
}
if (getController().isStopped()) {
return;
}
if (AsynchAEMessage.CASRefID == payload) {
// Fetch id of the CAS from the message.
if (getCasReferenceId(messageContext) == null) {
return; // Invalid message. Nothing to do
}
handleProcessRequestWithCASReference(messageContext);
} else if (AsynchAEMessage.XMIPayload == payload
|| AsynchAEMessage.BinaryPayload == payload) {
// Fetch id of the CAS from the message.
if (getCasReferenceId(messageContext) == null) {
return; // Invalid message. Nothing to do
}
handleProcessRequestFromRemoteClient(messageContext);
} else if (AsynchAEMessage.XCASPayload == payload) {
// Fetch id of the CAS from the message.
if (getCasReferenceId(messageContext) == null) {
return; // Invalid message. Nothing to do
}
handleProcessRequestWithXCAS(messageContext);
} else if (AsynchAEMessage.None == payload
&& AsynchAEMessage.CollectionProcessComplete == command) {
handleCollectionProcessCompleteRequest(messageContext);
} else if (AsynchAEMessage.None == payload && AsynchAEMessage.ReleaseCAS == command) {
handleReleaseCASRequest(messageContext);
} else if (AsynchAEMessage.None == payload && AsynchAEMessage.Stop == command) {
handleStopRequest(messageContext);
} else if (AsynchAEMessage.None == payload && AsynchAEMessage.Ping == command) {
handlePingRequest(messageContext);
}
// Handled Request
return;
}
// Not a Request nor Command. Delegate to the next handler in the chain
super.delegate(messageContext);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handle", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
getController().getErrorHandlerChain().handle(e,
HandlerBase.populateErrorContext((MessageContext) anObjectToHandle), getController());
}
}
}