blob: 92361eb5409cad9bfbbfc2b57698d5a43b4b3fe4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.aae.handler.input;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UIMARuntimeException;
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.SerializerCache;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaSerializer;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.BaseAnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.ExpiredMessageException;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.UimaAsDelegateException;
import org.apache.uima.aae.error.UimaEEServiceException;
import org.apache.uima.aae.handler.HandlerBase;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.AllowPreexistingFS;
import org.apache.uima.cas.impl.BinaryCasSerDes6;
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.cas.impl.MarkerImpl;
import org.apache.uima.cas.impl.Serialization;
import org.apache.uima.cas.impl.TypeSystemImpl;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
import org.apache.uima.cas.impl.BinaryCasSerDes6.ReuseInfo;
import org.apache.uima.util.Level;
public class ProcessResponseHandler extends HandlerBase {
private static final Class CLASS_NAME = ProcessResponseHandler.class;
public ProcessResponseHandler(String aName) {
super(aName);
}
private Endpoint lookupEndpoint(String anEndpointName, String aCasReferenceId) {
return getController().getInProcessCache().getEndpoint(anEndpointName, aCasReferenceId);
}
private void cancelTimer(MessageContext aMessageContext, String aCasReferenceId,
boolean removeEndpoint) throws AsynchAEException {
if (aMessageContext != null && aMessageContext.getEndpoint() != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "cancelTimer",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cancel_timer__FINE",
new Object[] { aMessageContext.getEndpoint().getEndpoint(), aCasReferenceId });
}
// Retrieve the endpoint from the cache using endpoint name
// and casRefereceId
if (aCasReferenceId == null
&& aMessageContext.propertyExists(AsynchAEMessage.Command)
&& aMessageContext.getMessageIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.CollectionProcessComplete) {
aCasReferenceId = ":CpC";
}
if (aMessageContext != null && aMessageContext.getEndpoint() != null) {
Endpoint endpoint = lookupEndpoint(aMessageContext.getEndpoint().getEndpoint(),
aCasReferenceId);
if (endpoint != null) {
// Received the response within timeout interval so
// cancel the running timer
endpoint.cancelTimer();
if (removeEndpoint) {
getController().getInProcessCache().removeEndpoint(
aMessageContext.getEndpoint().getEndpoint(), aCasReferenceId);
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"cancelTimer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_endpoint_not_found__INFO",
new Object[] { aMessageContext.getEndpoint().getEndpoint(), aCasReferenceId });
}
}
}
}
}
private void cancelTimerAndProcess(MessageContext aMessageContext, String aCasReferenceId,
CAS aCAS) throws AsynchAEException {
computeStats(aMessageContext, aCasReferenceId);
super.invokeProcess(aCAS, aCasReferenceId, null, aMessageContext, null);
}
private boolean isMessageExpected(String aCasReferenceId, Endpoint anEndpointWithTimer)
{
if (getController().getInProcessCache().entryExists(aCasReferenceId)
&& anEndpointWithTimer.isWaitingForResponse()) {
return true;
}
return false;
}
private void handleUnexpectedMessage(String aCasReferenceId, Endpoint anEndpoint) {
// Cas does not exist in the CAS Cache. This would be possible if the
// CAS has been dropped due to timeout and the delegate
// sends the response later. In asynch communication this scenario is
// possible. The service may not be up when the client sends
// the message. Messages accumulate in the service queue until the
// service becomes available. When this happens, the service
// will pickup messages from the queue, process them and send respones
// to an appropriate response queue. Most likely such
// respones should be thrown away. Well perhaps logged first.
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);
AnalysisEngineController controller = getController();
controller.getErrorHandlerChain().handle(new ExpiredMessageException(), errorContext,
controller);
}
private void handleProcessResponseFromRemote(MessageContext aMessageContext, String aDelegateKey) {
CAS cas = null;
String casReferenceId = null;
Endpoint endpointWithTimer = null;
try {
final int payload = aMessageContext.getMessageIntProperty(AsynchAEMessage.Payload);
casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
endpointWithTimer = lookupEndpoint(aMessageContext.getEndpoint().getEndpoint(),
casReferenceId);
if (endpointWithTimer == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_invalid_endpoint__WARNING",
new Object[] { aMessageContext.getEndpoint().getEndpoint(), casReferenceId });
}
return;
}
String delegateKey = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
Delegate delegate = ((AggregateAnalysisEngineController) getController())
.lookupDelegate(delegateKey);
boolean casRemovedFromOutstandingList = delegate.removeCasFromOutstandingList(casReferenceId);
// Check if this process reply message is expected. A message is expected if the Cas Id
// in the message matches an entry in the delegate's outstanding list. This list stores
// ids of CASes sent to the remote delegate pending reply.
if (!casRemovedFromOutstandingList) {
//handleUnexpectedMessage(casReferenceId, aMessageContext.getEndpoint());
return; // out of band reply. Most likely the CAS previously timedout
}
// Increment number of CASes processed by this delegate
if (aDelegateKey != null) {
ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) getController())
.getServicePerformance(aDelegateKey);
if (delegateServicePerformance != null) {
delegateServicePerformance.incrementNumberOfCASesProcessed();
}
}
String xmi = aMessageContext.getStringMessage();
// Fetch entry from the cache for a given Cas Id. The entry contains a CAS that will be used
// during deserialization
CacheEntry cacheEntry = getController().getInProcessCache().getCacheEntryForCAS(
casReferenceId);
// check if the client requested Performance Metrics for the CAS
if ( aMessageContext.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
try {
// find top ancestor of this CAS. All metrics are accumulated there since
// this is what will be returned to the client
CacheEntry ancestor =
getController().
getInProcessCache().
getTopAncestorCasEntry(cacheEntry);
if ( ancestor != null ) {
// fetch Performance Metrics from remote delegate reply
List<AnalysisEnginePerformanceMetrics> metrics =
UimaSerializer.deserializePerformanceMetrics(aMessageContext.getMessageStringProperty(AsynchAEMessage.CASPerComponentMetrics));
List<AnalysisEnginePerformanceMetrics> adjustedMetrics =
new ArrayList<AnalysisEnginePerformanceMetrics>();
for(AnalysisEnginePerformanceMetrics delegateMetric : metrics ) {
String adjustedUniqueName = ((AggregateAnalysisEngineController) getController()).getJmxContext();
if ( adjustedUniqueName.startsWith("p0=")) {
adjustedUniqueName = adjustedUniqueName.substring(3); // skip p0=
}
adjustedUniqueName = adjustedUniqueName.replaceAll(" Components", "");
if (!adjustedUniqueName.startsWith("/")) {
adjustedUniqueName = "/"+adjustedUniqueName;
}
adjustedUniqueName += delegateMetric.getUniqueName();
boolean found = false;
AnalysisEnginePerformanceMetrics metric = null;
for( AnalysisEnginePerformanceMetrics met : ancestor.getDelegateMetrics() ) {
if ( met.getUniqueName().equals(adjustedUniqueName)) {
long at = delegateMetric.getAnalysisTime();
long count = delegateMetric.getNumProcessed();
metric = new AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,at,count);
found = true;
ancestor.getDelegateMetrics().remove(met);
break;
}
}
if ( !found ) {
metric = new AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,delegateMetric.getAnalysisTime(),delegateMetric.getNumProcessed());
}
adjustedMetrics.add(metric);
}
ancestor.addDelegateMetrics(delegateKey, adjustedMetrics, true); // true=remote
}
} catch (Exception e) {
// An exception be be thrown here if the service is being stopped.
// The top level controller may have already cleaned up the cache
// and the getCacheEntryForCAS() will throw an exception. Ignore it
// here, we are shutting down.
}
}
CasStateEntry casStateEntry = ((AggregateAnalysisEngineController) getController())
.getLocalCache().lookupEntry(casReferenceId);
if (casStateEntry != null) {
casStateEntry.setReplyReceived();
// Set the key of the delegate that returned the CAS
casStateEntry.setLastDelegate(delegate);
} else {
return; // Cache Entry Not found
}
cas = cacheEntry.getCas();
int totalNumberOfParallelDelegatesProcessingCas = casStateEntry
.getNumberOfParallelDelegates();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_number_parallel_delegates_FINE",
new Object[] { totalNumberOfParallelDelegatesProcessingCas, Thread.currentThread().getId(), Thread.currentThread().getName() });
}
if (totalNumberOfParallelDelegatesProcessingCas > 1) {
// Block this thread until CAS is dispatched to all delegates in parallel step. Fixes race condition where
// a reply comes from one of delegates in parallel step before dispatch sequence completes. Without
// this blocking the result of analysis are merged into a CAS.
casStateEntry.blockIfParallelDispatchNotComplete();
}
if (cas == null) {
throw new AsynchAEException(Thread.currentThread().getName()
+ "-Cache Does not contain a CAS. Cas Reference Id::" + casReferenceId);
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_rcvd_reply_FINEST",
new Object[] { aMessageContext.getEndpoint().getEndpoint(), casReferenceId, xmi });
}
long t1 = getController().getCpuTime();
/* --------------------- */
/** DESERIALIZE THE CAS. */
/* --------------------- */
//all subsequent serialization must be complete CAS.
if ( !aMessageContext.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
cacheEntry.setAcceptsDeltaCas(false);
}
SerialFormat serialFormat = endpointWithTimer.getSerialFormat();
// check if the CAS is part of the Parallel Step
if (totalNumberOfParallelDelegatesProcessingCas > 1) {
// Synchronized because replies are merged into the same CAS.
synchronized (cas) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_delegate_responded_count_FINEST",
new Object[] { casStateEntry.howManyDelegatesResponded(), casReferenceId });
}
// If a delta CAS, merge it while checking that no pre-existing FSs are modified.
if (aMessageContext.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
switch (serialFormat) {
case XMI:
int highWaterMark = cacheEntry.getHighWaterMark();
deserialize(xmi, cas, casReferenceId, highWaterMark, AllowPreexistingFS.disallow);
break;
case COMPRESSED_FILTERED:
deserialize(aMessageContext.getByteMessage(), cas, cacheEntry, endpointWithTimer.getTypeSystemImpl(), AllowPreexistingFS.disallow);
break;
default:
throw new UIMARuntimeException(new Exception("Internal error"));
}
} else {
// If not a delta CAS (old service), take all of first reply, and merge in the new
// entries in the later replies. Ignoring pre-existing FS for 2.2.2 compatibility
// Note: can't be a compressed binary - that would have returned a delta
if (casStateEntry.howManyDelegatesResponded() == 0) {
deserialize(xmi, cas, casReferenceId);
} else { // process secondary reply from a parallel step
int highWaterMark = cacheEntry.getHighWaterMark();
deserialize(xmi, cas, casReferenceId, highWaterMark, AllowPreexistingFS.ignore);
}
}
casStateEntry.incrementHowManyDelegatesResponded();
}
} else { // Processing a reply from a non-parallel delegate (binary or delta xmi or xmi)
byte[] binaryData = aMessageContext.getByteMessage();
ByteArrayInputStream istream = new ByteArrayInputStream(binaryData);
switch (serialFormat) {
case BINARY:
((CASImpl)cas).reinit(istream);
break;
case COMPRESSED_FILTERED:
BinaryCasSerDes6 bcs = new BinaryCasSerDes6(cas, (MarkerImpl) cacheEntry.getMarker(), endpointWithTimer.getTypeSystemImpl(), cacheEntry.getCompress6ReuseInfo());
bcs.deserialize(istream, AllowPreexistingFS.allow);
break;
case XMI:
if (aMessageContext.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
int highWaterMark = cacheEntry.getHighWaterMark();
deserialize(xmi, cas, casReferenceId, highWaterMark, AllowPreexistingFS.allow);
} else {
deserialize(xmi, cas, casReferenceId);
}
break;
default:
throw new UIMARuntimeException(new Exception("Internal error"));
}
}
long timeToDeserializeCAS = getController().getCpuTime() - t1;
getController().getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
ServicePerformance casStats = getController().getCasStatistics(casReferenceId);
casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
LongNumericStatistic statistic;
if ((statistic = getController().getMonitor().getLongNumericStatistic("",
Monitor.TotalDeserializeTime)) != null) {
statistic.increment(timeToDeserializeCAS);
}
computeStats(aMessageContext, casReferenceId);
// Send CAS for processing when all delegates reply
// totalNumberOfParallelDelegatesProcessingCas indicates how many delegates are processing CAS
// in parallel. Default is 1, meaning only one delegate processes the CAS at the same.
// Otherwise, check if all delegates responded before passing CAS on to the Flow Controller.
// The idea is that all delegates processing one CAS concurrently must respond, before the CAS
// is allowed to move on to the next step.
// HowManyDelegatesResponded is incremented every time a parallel delegate sends response.
if (totalNumberOfParallelDelegatesProcessingCas == 1
|| receivedAllResponsesFromParallelDelegates(casStateEntry,
totalNumberOfParallelDelegatesProcessingCas)) {
super.invokeProcess(cas, casReferenceId, null, aMessageContext, null);
}
} catch (Exception e) {
// Check if the exception was thrown by the Cache while looking up
// the CAS. It may be the case if in the parallel step one thread
// drops the CAS in the Error Handling while another thread processes
// reply from another delegate in the Parallel Step. A race condition
// may occur here. If one thread drops the CAS due to excessive exceptions
// and Flow Controller is configured to drop the CAS, the other thread
// should not be allowed to move the CAS to process()method. The second
// thread will find the CAS missing in the cache and the logic below
// just logs the stale CAS and returns and doesnt attempt to handle
// the missing CAS exception.
if (e instanceof AsynchAEException && e.getMessage() != null
&& e.getMessage().startsWith("Cas Not Found")) {
String key = "N/A";
if (endpointWithTimer != null) {
key = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(endpointWithTimer.getEndpoint());
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_stale_reply__INFO",
new Object[] { getController().getComponentName(), key, casReferenceId });
}
// The reply came late. The CAS was removed from the cache.
return;
}
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
getController().getErrorHandlerChain().handle(e, errorContext, getController());
} finally {
incrementDelegateProcessCount(aMessageContext);
}
}
private synchronized boolean receivedAllResponsesFromParallelDelegates(
CasStateEntry aCasStateEntry, int totalNumberOfParallelDelegatesProcessingCas) {
if (aCasStateEntry.howManyDelegatesResponded() == totalNumberOfParallelDelegatesProcessingCas) {
aCasStateEntry.resetDelegateResponded();
return true;
}
return false;
}
private void deserialize(String xmi, CAS cas, String casReferenceId, int highWaterMark,
AllowPreexistingFS allow) throws Exception {
XmiSerializationSharedData deserSharedData;
deserSharedData = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId)
.getDeserSharedData();
UimaSerializer uimaSerializer = SerializerCache.lookupSerializerByThreadId();
uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, highWaterMark, allow);
}
private void deserialize(byte[] bytes, CAS cas, CacheEntry cacheEntry, TypeSystemImpl remoteTs,
AllowPreexistingFS allow) throws Exception {
ByteArrayInputStream istream = new ByteArrayInputStream(bytes);
ReuseInfo reuseInfo = cacheEntry.getCompress6ReuseInfo();
Serialization.deserializeCAS(cas, istream, remoteTs, reuseInfo, allow);
}
private void deserialize(String xmi, CAS cas, String casReferenceId) throws Exception {
CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
// Processing the reply from a standard, non-parallel delegate
XmiSerializationSharedData deserSharedData;
deserSharedData = entry.getDeserSharedData();
if (deserSharedData == null) {
deserSharedData = new XmiSerializationSharedData();
entry.setXmiSerializationData(deserSharedData);
}
UimaSerializer uimaSerializer = SerializerCache.lookupSerializerByThreadId();
uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
}
private void handleProcessResponseWithCASReference(MessageContext aMessageContext) {
String casReferenceId = null;
CacheEntry cacheEntry = null;
try {
casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
cacheEntry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
CasStateEntry casStateEntry = ((AggregateAnalysisEngineController) getController())
.getLocalCache().lookupEntry(casReferenceId);
CAS cas = cacheEntry.getCas();
String delegateKey = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
Delegate delegate = ((AggregateAnalysisEngineController) getController())
.lookupDelegate(delegateKey);
if (casStateEntry != null) {
casStateEntry.setReplyReceived();
casStateEntry.setLastDelegate(delegate);
}
delegate.removeCasFromOutstandingList(casReferenceId);
if (cas != null) {
cancelTimerAndProcess(aMessageContext, casReferenceId, cas);
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleProcessResponseWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_not_in_cache__INFO",
new Object[] { getController().getName(), casReferenceId,
aMessageContext.getEndpoint().getEndpoint() });
}
throw new AsynchAEException("CAS with Reference Id:" + casReferenceId
+ " Not Found in CasManager's CAS Cache");
}
} catch (Exception e) {
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
getController().getErrorHandlerChain().handle(e, errorContext, getController());
} finally {
incrementDelegateProcessCount(aMessageContext);
if (getController() instanceof AggregateAnalysisEngineController) {
try {
String endpointName = aMessageContext.getEndpoint().getEndpoint();
String delegateKey = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(endpointName);
if (delegateKey != null) {
Endpoint endpoint = ((AggregateAnalysisEngineController) getController())
.lookUpEndpoint(delegateKey, false);
// Check if the multiplier aborted during processing of this input CAS
if (endpoint != null && endpoint.isCasMultiplier() && cacheEntry.isAborted()) {
if (!getController().getInProcessCache().isEmpty()) {
getController().getInProcessCache().registerCallbackWhenCacheEmpty(
getController().getEventListener());
} else {
// Callback to notify that the cache is empty
// !!!!!!!!!!!!!!! WHY DO WE NEED TO CALL onCacheEmpty() IF CAS IS ABORTED?
// !!!!!!!!!!!!!!!!!!!!!! ?????????????????????????????????
// getController().getEventListener().onCacheEmpty();
}
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
if ( getController() != null ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleProcessResponseWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getController().getComponentName());
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleProcessResponseWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
}
}
}
private void incrementDelegateProcessCount(MessageContext aMessageContext) {
Endpoint endpoint = aMessageContext.getEndpoint();
if (endpoint != null && getController() instanceof AggregateAnalysisEngineController) {
try {
String delegateKey = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(endpoint.getEndpoint());
LongNumericStatistic stat = getController().getMonitor().getLongNumericStatistic(
delegateKey, Monitor.ProcessCount);
stat.increment();
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"incrementDelegateProcessCount", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_delegate_key_for_endpoint_not_found__INFO", new Object[] { getController().getComponentName(), endpoint.getEndpoint() });
}
}
}
}
private boolean isException(Object object) {
return (object instanceof Exception || object instanceof Throwable);
}
private boolean isShutdownException(Object object) {
return (object instanceof Exception && object instanceof UimaEEServiceException
&& ((UimaEEServiceException) object).getCause() != null && ((UimaEEServiceException) object)
.getCause() instanceof ServiceShutdownException);
}
private boolean ignoreException(Object object) {
if (object != null && isException(object) && !isShutdownException(object)) {
return false;
}
return true;
}
private synchronized void handleProcessResponseWithException(MessageContext aMessageContext,
String delegateKey) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME)
.logrb(
Level.FINE,
CLASS_NAME.getName(),
"handleProcessResponseWithException",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_handling_exception_from_delegate_FINE",
new Object[] { getController().getName(),
aMessageContext.getEndpoint().getEndpoint() });
}
boolean isCpCError = false;
String casReferenceId = null;
try {
// If a Process Request, increment number of docs processed
if (aMessageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response
&& aMessageContext.getMessageIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.Process) {
// Increment number of CASes processed by a delegate
incrementDelegateProcessCount(aMessageContext);
}
Object object = aMessageContext.getObjectMessage();
if (object == null) {
// Could be a C++ exception. In this case the exception is just a String in the message
// cargo
if (aMessageContext.getStringMessage() != null) {
object = new UimaEEServiceException(aMessageContext.getStringMessage());
}
}
if (ignoreException(object)) {
return;
}
if (getController() instanceof AggregateAnalysisEngineController
&& aMessageContext.propertyExists(AsynchAEMessage.Command)
&& aMessageContext.getMessageIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.CollectionProcessComplete) {
isCpCError = true;
((AggregateAnalysisEngineController) getController())
.processCollectionCompleteReplyFromDelegate(delegateKey, false);
} else {
casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
}
if (object != null && (object instanceof Exception || object instanceof Throwable)) {
String casid_msg = (casReferenceId == null) ? "" : " on CAS:" + casReferenceId;
String controllerName = "/" + getController().getComponentName();
if (!getController().isTopLevelComponent()) {
controllerName += ((BaseAnalysisEngineController) getController().getParentController())
.getUimaContextAdmin().getQualifiedContextName();
}
Exception remoteException = new UimaAsDelegateException("----> Controller:"
+ controllerName + " Received Exception " + casid_msg + " From Delegate:"
+ delegateKey, (Exception) object);
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, aMessageContext
.getMessageIntProperty(AsynchAEMessage.Command));
errorContext.add(AsynchAEMessage.MessageType, aMessageContext
.getMessageIntProperty(AsynchAEMessage.MessageType));
if (!isCpCError) {
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
}
errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
getController().getErrorHandlerChain().handle(remoteException, errorContext,
getController());
}
} catch (Exception e) {
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
getController().getErrorHandlerChain().handle(e, errorContext, getController());
}
}
private void handleCollectionProcessCompleteReply(MessageContext aMessageContext,
String delegateKey) {
try {
if (getController() instanceof AggregateAnalysisEngineController) {
((AggregateAnalysisEngineController) getController())
.processCollectionCompleteReplyFromDelegate(delegateKey, true);
}
} catch (Exception e) {
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
getController().getErrorHandlerChain().handle(e, errorContext, getController());
}
}
private void resetErrorCounts(String aDelegate) {
getController().getMonitor().resetCountingStatistic(aDelegate, Monitor.ProcessErrorCount);
getController().getMonitor().resetCountingStatistic(aDelegate, Monitor.ProcessErrorRetryCount);
}
private void handlePingReply(MessageContext aMessageContext) {
try {
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
if ( getController() != null ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handlePingReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getController().getComponentName());
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handlePingReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
}
private void handleServiceInfoReply(MessageContext messageContext) {
String casReferenceId = null;
try {
casReferenceId = messageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
if ( casReferenceId == null ) {
return; // nothing to do
}
Endpoint freeCasEndpoint = messageContext.getEndpoint();
CasStateEntry casStateEntry = ((AggregateAnalysisEngineController) getController())
.getLocalCache().lookupEntry(casReferenceId);
if (casStateEntry != null) {
casStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
// Fetch host IP where the CAS is being processed. When the UIMA AS service
// receives a CAS it immediately sends ServiceInfo Reply message containing
// IP of the host where the service is running.
String serviceHostIp = messageContext.getMessageStringProperty(AsynchAEMessage.ServerIP);
if ( serviceHostIp != null ) {
casStateEntry.setHostIpProcessingCAS(serviceHostIp);
}
}
} catch (Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleServiceInfoReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
return;
}
}
public void handle(Object anObjectToHandle) throws AsynchAEException {
super.validate(anObjectToHandle);
MessageContext messageContext = (MessageContext) anObjectToHandle;
if (isHandlerForMessage(messageContext, AsynchAEMessage.Response, AsynchAEMessage.Process)
|| isHandlerForMessage(messageContext, AsynchAEMessage.Response, AsynchAEMessage.ACK)
|| isHandlerForMessage(messageContext, AsynchAEMessage.Response,
AsynchAEMessage.ServiceInfo)
|| isHandlerForMessage(messageContext, AsynchAEMessage.Response,
AsynchAEMessage.CollectionProcessComplete)) {
int payload = messageContext.getMessageIntProperty(AsynchAEMessage.Payload);
int command = messageContext.getMessageIntProperty(AsynchAEMessage.Command);
String delegate = ((Endpoint) messageContext.getEndpoint()).getEndpoint();
String key = null;
String fromServer = null;
if (getController() instanceof AggregateAnalysisEngineController) {
if (((Endpoint) messageContext.getEndpoint()).isRemote()) {
if (((MessageContext) anObjectToHandle).propertyExists(AsynchAEMessage.EndpointServer)) {
fromServer = ((MessageContext) anObjectToHandle)
.getMessageStringProperty(AsynchAEMessage.EndpointServer);
}
// If old service does not echo back the external broker name then the queue name must be
// unique.
// Can't use the ServerURI set by the service as it may be its local name for the broker,
// e.g. tcp://localhost:61616
}
key = ((AggregateAnalysisEngineController) getController()).lookUpDelegateKey(delegate,
fromServer);
}
if (AsynchAEMessage.CASRefID == payload) {
handleProcessResponseWithCASReference(messageContext);
if (key != null) {
resetErrorCounts(key);
}
} else if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.BinaryPayload == payload) {
handleProcessResponseFromRemote(messageContext, key);
if (key != null) {
resetErrorCounts(key);
}
} else if (AsynchAEMessage.Exception == payload) {
if (key == null) {
key = ((Endpoint) messageContext.getEndpoint()).getEndpoint();
}
handleProcessResponseWithException(messageContext, key);
} else if (AsynchAEMessage.None == payload
&& AsynchAEMessage.CollectionProcessComplete == command) {
if (key == null) {
key = ((Endpoint) messageContext.getEndpoint()).getEndpoint();
}
handleCollectionProcessCompleteReply(messageContext, key);
} else if (AsynchAEMessage.None == payload && AsynchAEMessage.ACK == command) {
handleACK(messageContext, key);
} else if (AsynchAEMessage.None == payload && AsynchAEMessage.Ping == command) {
handlePingReply(messageContext);
} else if (AsynchAEMessage.None == payload && AsynchAEMessage.ServiceInfo == command) {
handleServiceInfoReply(messageContext);
} else {
throw new AsynchAEException("Invalid Payload. Expected XMI or CasReferenceId Instead Got::"
+ payload);
}
// Handled Request to Process with A given Payload
return;
}
// Not a Request nor Command. Delegate to the next handler in the chain
super.delegate(messageContext);
}
private void handleACK(MessageContext aMessageContext, String key) throws AsynchAEException {
}
}