blob: 83c2a14eed14bff53526b0a1f0a91b54b0e031ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.aae.controller;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.AsynchAECasManager;
import org.apache.uima.aae.InProcessCache;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.ControllerDelegate;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.UimaEEServiceException;
import org.apache.uima.aae.error.UnknownDestinationException;
import org.apache.uima.aae.jmx.AggregateServiceInfo;
import org.apache.uima.aae.jmx.JmxManagement;
import org.apache.uima.aae.jmx.PrimitiveServiceInfo;
import org.apache.uima.aae.jmx.ServiceErrors;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
import org.apache.uima.aae.monitor.statistics.Statistic;
import org.apache.uima.aae.spi.transport.UimaMessage;
import org.apache.uima.aae.spi.transport.UimaTransport;
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.analysis_engine.asb.impl.FlowContainer;
import org.apache.uima.analysis_engine.asb.impl.FlowControllerContainer;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.flow.FinalStep;
import org.apache.uima.flow.ParallelStep;
import org.apache.uima.flow.SimpleStep;
import org.apache.uima.flow.Step;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.resource.metadata.ResourceMetaData;
import org.apache.uima.util.Level;
import org.apache.uima.util.XMLInputSource;
public class AggregateAnalysisEngineController_impl extends BaseAnalysisEngineController implements
AggregateAnalysisEngineController, AggregateAnalysisEngineController_implMBean {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Class CLASS_NAME = AggregateAnalysisEngineController_impl.class;
private static final int SERVICE_INFO_INDX = 0;
private static final int SERVICE_PERF_INDX = 1;
private static final int SERVICE_ERROR_INDX = 2;
private ConcurrentHashMap flowMap = new ConcurrentHashMap();
private ConcurrentHashMap destinationMap;
private Map destinationToKeyMap;
private volatile boolean typeSystemsMerged = false;
private AnalysisEngineMetaData aggregateMetadata;
private HashMap analysisEngineMetaDataMap = new HashMap();
private List disabledDelegateList = new ArrayList();
private List remoteCasMultiplierList = new ArrayList();
private String descriptor;
private transient FlowControllerContainer flowControllerContainer;
private String flowControllerDescriptor;
private ConcurrentHashMap originMap = new ConcurrentHashMap();
private String controllerBeanName = null;
private String serviceEndpointName = null;
protected volatile boolean initialized = false;
protected List<AnalysisEngineController> childControllerList = new ArrayList<AnalysisEngineController>();
private ConcurrentHashMap delegateStats = new ConcurrentHashMap();
private AggregateServiceInfo serviceInfo = null;
private int remoteIndex = 1;
private volatile boolean requestForMetaSentToRemotes = false;
private ConcurrentHashMap<String, Object[]> delegateStatMap = new ConcurrentHashMap<String, Object[]>();
public final Object parallelStepMux = new Object();
// prevents more than one thread to call collectionProcessComplete on the FC
private volatile boolean doSendCpcReply = false;
// Guards FC's next() method. Single thread access is allowed at any given time
private Semaphore flowSemaphore = new Semaphore(1);
/**
*
* @param anEndpointName
* @param aDescriptor
* @param aCasManager
* @param anInProcessCache
* @param aDestinationMap
* @throws Exception
*/
public AggregateAnalysisEngineController_impl(String anEndpointName, String aDescriptor,
AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap)
throws Exception {
this(null, anEndpointName, aDescriptor, aCasManager, anInProcessCache, aDestinationMap);
}
/**
*
*
* @param aParentController
* @param anEndpointName
* @param aDescriptor
* @param aCasManager
* @param anInProcessCache
* @param aDestinationMap
* @throws Exception
*/
public AggregateAnalysisEngineController_impl(AnalysisEngineController aParentController,
String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager,
InProcessCache anInProcessCache, Map aDestinationMap) throws Exception {
this(aParentController, anEndpointName, aDescriptor, aCasManager, anInProcessCache,
aDestinationMap, null);
}
public AggregateAnalysisEngineController_impl(AnalysisEngineController aParentController,
String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager,
InProcessCache anInProcessCache, Map aDestinationMap, JmxManagement aJmxManagement)
throws Exception {
super(aParentController, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache,
aDestinationMap, aJmxManagement);
this.initialize();
}
public void registerChildController(AnalysisEngineController aChildController, String aDelegateKey)
throws Exception {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"registerChildController", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_register_controller__FINE",
new Object[] { getComponentName(), aChildController.getComponentName() });
}
synchronized(childControllerList) {
childControllerList.add(aChildController);
}
}
public void saveStatsFromService(String aServiceEndpointName, Map aServiceStats) {
String delegateKey = lookUpDelegateKey(aServiceEndpointName);
delegateStats.put(delegateKey, aServiceStats);
}
/**
*
*/
public void addMessageOrigin(String aCasReferenceId, Endpoint anEndpoint) {
if (anEndpoint == null) {
System.out.println("Controller:" + getComponentName()
+ " Endpoint is NULL. Cas Reference Id:" + aCasReferenceId);
}
originMap.put(aCasReferenceId, anEndpoint);
if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
Iterator it = originMap.keySet().iterator();
StringBuffer sb = new StringBuffer();
while (it.hasNext()) {
String key = (String) it.next();
Endpoint e = (Endpoint) originMap.get(key);
if (e != null) {
sb.append("\t\nCAS:" + key + " Origin:" + e.getEndpoint());
}
}
/*
* UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
* "addMessageOrigin", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
* "UIMAEE_dump_msg_origin__FINE", new Object[] {getComponentName(), sb.toString()});
*/
}
}
public boolean isDelegateDisabled(String aDelegateKey) {
if (aDelegateKey == null) {
return false;
}
Iterator it = disabledDelegateList.iterator();
while (it.hasNext()) {
if (aDelegateKey.equalsIgnoreCase((String) it.next())) {
return true;
}
}
return false;
}
/**
*
* @param anEndpointName
*/
public void setServiceEndpointName(String anEndpointName) {
serviceEndpointName = anEndpointName;
if (this.isTopLevelComponent()) {
// This is done so that the collocated client application can determine where to send messages
System.setProperty("ServiceInputQueueName", serviceEndpointName);
}
}
/**
*
*/
public String getServiceEndpointName() {
return serviceEndpointName;
}
/**
*
* @param aBeanName
*/
public void setControllerBeanName(String aBeanName) {
controllerBeanName = aBeanName;
if (this.isTopLevelComponent()) {
System.setProperty("Controller", controllerBeanName);
}
}
/**
*
*/
public Endpoint getMessageOrigin(String aCasReferenceId) {
if (originMap.containsKey(aCasReferenceId)) {
return (Endpoint) originMap.get(aCasReferenceId);
}
return null;
}
public void removeMessageOrigin(String aCasReferenceId) {
if (originMap.containsKey(aCasReferenceId)) {
originMap.remove(aCasReferenceId);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"removeMessageOrigin", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_remove_msg_origin_entry__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
}
}
/**
*
*/
public void dropCAS(String aCasReferenceId, boolean dropCacheEntry) {
FlowContainer flow = lookupFlow(aCasReferenceId);
if (flow != null) {
flowMap.remove(aCasReferenceId);
}
super.dropCAS(aCasReferenceId, dropCacheEntry);
}
public void dropFlow(String aCasReferenceId, boolean abortFlow) {
FlowContainer flow = lookupFlow(aCasReferenceId);
if (flow != null) {
if (abortFlow) {
synchronized (flowControllerContainer) {
flow.aborted();
}
}
flowMap.remove(aCasReferenceId);
}
}
/**
*
*/
public void mapEndpointsToKeys(ConcurrentHashMap aDestinationMap) {
destinationMap = aDestinationMap;
Set set = destinationMap.entrySet();
for (Iterator it = set.iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
Endpoint endpoint = (Endpoint) entry.getValue();
if (endpoint != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"mapEndpointsToKeys", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_endpoint_to_key_map__FINE",
new Object[] { getName(), (String) entry.getKey(), endpoint.getEndpoint() });
}
if (destinationToKeyMap == null) {
destinationToKeyMap = new HashMap();
}
// Create and initialize a Delegate object for the endpoint
Delegate delegate = new ControllerDelegate((String) entry.getKey(), this);
delegate.setCasProcessTimeout(endpoint.getProcessRequestTimeout());
delegate.setGetMetaTimeout(endpoint.getMetadataRequestTimeout());
delegate.setEndpoint(endpoint);
// Add new delegate to the global Delegate list
delegates.add(delegate);
endpoint.setDelegateKey((String) entry.getKey());
destinationToKeyMap.put(endpoint.getEndpoint(), (String) entry.getKey());
}
}
}
/**
* Change the CPC status for each delegate in the destination Map.
*/
private void resetEndpointsCpcStatus() {
Set set = destinationMap.entrySet();
for (Iterator it = set.iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
Endpoint endpoint = (Endpoint) entry.getValue();
if (endpoint != null && endpoint.getStatus() == Endpoint.OK) {
endpoint.setCompletedProcessingCollection(false);
}
}
}
/**
*
* @return
*/
private synchronized boolean allDelegatesCompletedCollection() {
Set set = destinationMap.entrySet();
for (Iterator it = set.iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
Endpoint endpoint = (Endpoint) entry.getValue();
if (endpoint != null && endpoint.getStatus() == Endpoint.OK
&& endpoint.completedProcessingCollection() == false) {
return false;
}
}
// All delegates replied to CPC, change the status of each delegate
// to handle next CPC request.
resetEndpointsCpcStatus();
return true;
}
public Map getDelegateStats() {
return delegateStats;
}
/**
*
*/
public void processCollectionCompleteReplyFromDelegate(String aDelegateKey, boolean sendReply)
throws AsynchAEException {
try {
Endpoint endpoint = (Endpoint) destinationMap.get(aDelegateKey);
String key = lookUpDelegateKey(aDelegateKey);
if (endpoint == null) {
endpoint = (Endpoint) destinationMap.get(key);
if (endpoint == null) {
throw new AsynchAEException("Unable to find Endpoint Object Using:" + aDelegateKey);
}
}
endpoint.cancelTimer();
endpoint.setCompletedProcessingCollection(true);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"processCollectionCompleteReplyFromDelegate",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_recvd_cpc_reply__FINE",
new Object[] { key });
}
Endpoint cEndpoint = null;
// synchronized to prevent more than one thread to call collectionProcessComplete() on
// the Flow Controller.
if (flowControllerContainer != null) {
synchronized (flowControllerContainer) {
if (doSendCpcReply == false && sendReply && allDelegatesCompletedCollection()
&& ((cEndpoint = getClientEndpoint()) != null)) {
doSendCpcReply = true;
flowControllerContainer.collectionProcessComplete();
}
}
}
// Reply to a client once for each CPC request. doSendCpcReply is volatile thus
// no need to synchronize it
if (doSendCpcReply) {
sendCpcReply(cEndpoint);
doSendCpcReply = false; // reset for the next CPC
}
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private void sendCpcReply(Endpoint aClientEndpoint) throws Exception {
Iterator destIterator = destinationMap.keySet().iterator();
while (destIterator.hasNext()) {
String key = (String) destIterator.next();
Endpoint endpoint = (Endpoint) destinationMap.get(key);
if (endpoint != null) {
endpoint.setCompletedProcessingCollection(false); // reset for the next run
}
logStats(key, getDelegateServicePerformance(key));
}
// Log this controller's stats
logStats(getComponentName(), servicePerformance);
endProcess(AsynchAEMessage.Process);
if (aClientEndpoint == null) {
aClientEndpoint = getClientEndpoint();
}
if (!aClientEndpoint.isRemote()) {
UimaTransport transport = getTransport(aClientEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.CollectionProcessComplete,
AsynchAEMessage.Response, getName());
// Send reply back to the client. Use internal (non-jms) transport
transport.getUimaMessageDispatcher(aClientEndpoint.getEndpoint()).dispatch(message);
} else {
getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint);
}
clearStats();
}
/**
*
* @param aFlowControllerDescriptor
*/
public synchronized void setFlowControllerDescriptor(String aFlowControllerDescriptor) {
flowControllerDescriptor = aFlowControllerDescriptor;
}
/**
*
* @param anEndpoint
* @throws AsynchAEException
*/
private void waitUntilAllCasesAreProcessed(Endpoint anEndpoint) throws AsynchAEException {
try {
boolean cacheNotEmpty = true;
boolean shownOnce = false;
final Object localMux = new Object();
while (cacheNotEmpty) {
InProcessCache cache = getInProcessCache();
if (!shownOnce) {
shownOnce = true;
cache.dumpContents(getComponentName());
}
if (cache.isEmpty()) {
cacheNotEmpty = false;
} else {
synchronized (localMux) {
localMux.wait(10);
}
}
}
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
/**
*
*/
public void takeAction(String anAction, String anEndpointName, ErrorContext anErrorContext) {
try {
handleAction(anAction, anEndpointName, anErrorContext);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"takeAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
}
public void collectionProcessComplete(Endpoint anEndpoint) throws AsynchAEException {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cpc__FINEST", new Object[] { getName() });
}
getInProcessCache().dumpContents(getComponentName());
localCache.dumpContents();
cacheClientEndpoint(anEndpoint);
// Wait until the entire cache is empty. The cache stores in-process CASes.
// When a CAS is processed completly it is removed from the cache.
waitUntilAllCasesAreProcessed(anEndpoint);
anEndpoint.setCommand(AsynchAEMessage.CollectionProcessComplete);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cpc_all_cases_processed__FINEST", new Object[] { getName() });
}
// Special case. Check if ALL delegates have been disabled. If so, destinationMap
// will be empty.
if (destinationMap.size() == 0) {
try {
sendCpcReply(null);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
} else {
Set set = destinationMap.entrySet();
for (Iterator it = set.iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
Endpoint endpoint = (Endpoint) entry.getValue();
if (endpoint != null && endpoint.getStatus() == Endpoint.OK) {
if (!endpoint.isRemote()) {
try {
UimaTransport transport = getTransport(endpoint.getEndpoint());
UimaMessage message = transport
.produceMessage(AsynchAEMessage.CollectionProcessComplete,
AsynchAEMessage.Request, getName());
// Send reply back to the client. Use internal (non-jms) transport
transport.getUimaMessageDispatcher(endpoint.getEndpoint()).dispatch(message);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
} else {
getOutputChannel().sendRequest(AsynchAEMessage.CollectionProcessComplete, endpoint);
endpoint.startCollectionProcessCompleteTimer();
}
}
}
}
}
public String getDescriptor() {
return descriptor;
}
public void setDescriptor(String descriptor) {
this.descriptor = descriptor;
}
public boolean isPrimitive() {
return false;
}
public Map getDestinations() {
return destinationMap;
}
public void enableDelegates(List aDelegateList) throws AsynchAEException {
try {
// flowControllerContainer.addAnalysisEngines(aDelegateList);
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
public void handleInitializationError(Exception ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleInitializationError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { ex });
}
// Any problems in completeInitialization() is a reason to stop
notifyListenersWithInitializationStatus(ex);
super.stop();
}
private void stopListener(String key, Endpoint endpoint) throws Exception {
// Stop the Listener on endpoint that has been disabled
InputChannel iC = null;
String destName = null;
if (endpoint.getDestination() != null) {
System.out.println("Controller:" + getComponentName()
+ "-Stopping Listener Thread on Endpoint:" + endpoint.getDestination());
destName = endpoint.getDestination().toString();
iC = getInputChannel(destName);
} else {
System.out.println("Controller:" + getComponentName()
+ "-Stopping Listener Thread on Endpoint:" + endpoint.getReplyToEndpoint());
destName = endpoint.getReplyToEndpoint();
iC = getInputChannel(destName);
}
if (iC != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "stopListener",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stopping_listener__INFO",
new Object[] { getComponentName(), destName, key });
}
iC.destroyListener(destName, key);
}
}
public void disableDelegates(List aDelegateList) throws AsynchAEException {
disableDelegates(aDelegateList, null);
}
protected void disableDelegates(List aDelegateList, String aCasReferenceId)
throws AsynchAEException {
if (aDelegateList == null) {
throw new AsynchAEException("Controller:" + getComponentName()
+ " Unable To Disable a Delegate. The Delegate List Provided Is Invalid (Null)");
}
try {
Iterator it = aDelegateList.iterator();
while (it.hasNext()) {
String key = (String) it.next();
Endpoint endpoint = lookUpEndpoint(key, false);
// if the the current delegate is remote, destroy its listener
if (endpoint != null && endpoint.isRemote()) {
Delegate delegate = lookupDelegate(key);
if (delegate != null) {
delegate.cancelDelegateTimer();
}
stopListener(key, endpoint);
endpoint.setStatus(Endpoint.DISABLED);
}
System.out.println("Controller:" + getComponentName() + " Disabled Delegate:" + key
+ " Due to Excessive Errors");
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
"disableDelegates", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_disable_endpoint__INFO", new Object[] { getComponentName(), key });
}
// Change state of the delegate
ServiceInfo sf = getDelegateServiceInfo(key);
if (sf != null) {
sf.setState(ServiceState.DISABLED.name());
}
synchronized (disabledDelegateList) {
disabledDelegateList.add(key);
}
}
if (flowControllerContainer != null) {
try {
synchronized (flowControllerContainer) {
flowControllerContainer.removeAnalysisEngines(aDelegateList);
}
} catch (Exception ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"disableDelegates", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { ex });
}
if (aCasReferenceId != null) {
CasStateEntry parentCasCacheEntry = getLocalCache().getTopCasAncestor(aCasReferenceId);
if (parentCasCacheEntry != null && aDelegateList.size() > 0) {
String delegateKey = (String) aDelegateList.get(0);
System.out.println("Controller:" + getComponentName()
+ " Terminating Due to FlowController Failure While Disabling Delegate:"
+ delegateKey + " Cas:" + parentCasCacheEntry.getCasReferenceId());
super.terminate(ex, parentCasCacheEntry.getCasReferenceId());
} else {
terminate();
}
} else {
terminate();
}
return;
}
}
if (!initialized && allTypeSystemsMerged()) {
try {
completeInitialization();
} catch (ResourceInitializationException ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"disableDelegates", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { ex });
}
handleInitializationError(ex);
return;
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"disableDelegates", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
throw new AsynchAEException(e);
}
}
public boolean continueOnError(String aCasReferenceId, String aDelegateKey, Exception anException)
throws AsynchAEException {
if (aDelegateKey == null || aCasReferenceId == null) {
return false;
}
try {
FlowContainer flow = lookupFlow(aCasReferenceId);
if (flow == null) {
return false;
}
synchronized (flowControllerContainer) {
return flow.continueOnFailure(aDelegateKey, anException);
}
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private FlowContainer lookupFlow(String aCasReferenceId) {
if (flowMap != null) {
if (flowMap.containsKey(aCasReferenceId)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"lookupFlow", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_retrieve_flow_object__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
return (FlowContainer) flowMap.get(aCasReferenceId);
}
}
return null;
}
public String getLastDelegateKeyFromFlow(String anInputCasReferenceId) {
return null;
}
/**
* This routine is called to handle CASes produced by a CAS Multiplier. A new CAS needs a flow
* object which is produced here from the Flow associated with the input CAS. Once the subflow is
* computed, it is cached for future use.
*
* @param aCAS
* - CAS to process
* @param anInputCasReferenceId
* - reference id of the input CAS
* @param aNewCasReferenceId
* - reference id of the CAS created by the CAS multiplier
* @param newCASProducedBy
* - name of the multiplier that created the CAS
* @throws AnalysisEngineProcessException
* @throws AsynchAEException
*/
public void process(CAS aCAS, String anInputCasReferenceId, String aNewCasReferenceId,
String newCASProducedBy) // throws AnalysisEngineProcessException, AsynchAEException
{
FlowContainer flow = null;
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_lookup_flow__FINE",
new Object[] { getComponentName(), anInputCasReferenceId });
}
try {
// Lookup a Flow object associated with an input CAS.
if (flowMap.containsKey(anInputCasReferenceId)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_retrieve_flow_object__FINEST",
new Object[] { getComponentName(), anInputCasReferenceId });
}
// Retrieve an input CAS Flow object from the flow cache. This Flow object will be used
// to compute
// subordinate Flow for the new CAS.
flow = (FlowContainer) flowMap.get(anInputCasReferenceId);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_retrieved_flow_object_ok__FINEST",
new Object[] { getComponentName(), anInputCasReferenceId });
}
}
if (flow != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_lookup_flow_ok__FINE",
new Object[] { getComponentName(), aNewCasReferenceId, newCASProducedBy,
anInputCasReferenceId, });
}
// Compute subordinate Flow from the Flow associated with the
// input CAS.
synchronized (flowControllerContainer) {
flow = flow.newCasProduced(aCAS, newCASProducedBy);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_new_flow_ok__FINE",
new Object[] { getComponentName(), aNewCasReferenceId, newCASProducedBy,
anInputCasReferenceId, });
}
// Check if the local cache already contains an entry for the Cas id.
// A colocated Cas Multiplier may have already registered this CAS
// in the parent's controller
if (localCache.lookupEntry(aNewCasReferenceId) == null) {
// Add this Cas Id to the local cache. Every input CAS goes through here
CasStateEntry casStateEntry = localCache.createCasStateEntry(aNewCasReferenceId);
casStateEntry.setInputCasReferenceId(anInputCasReferenceId);
}
// Save the subordinate Flow Object in a cache. Flow exists in the
// cache until the CAS is fully processed or it is
// explicitly deleted when processing of this CAS cannot continue
flowMap.put(aNewCasReferenceId, flow);
// Register the fact that this is a new CAS and the fact that is was produced
// by this aggregate. It is important to register this to determine how to
// handle the CAS in delegate Aggregate services. When the CAS is processed
// in the Delegate Aggregate, the CAS produced in the parent Aggregate cannot
// be dropped in the delegate. Check Final Step logic.
getInProcessCache().getCacheEntryForCAS(aNewCasReferenceId).setNewCas(true,
getComponentName());
} else {
throw new AsynchAEException(
"Flow Object Not In Flow Cache. Expected Flow Object in FlowCache for Cas Reference Id:"
+ anInputCasReferenceId);
}
} catch (Throwable t) {
// Any error here is automatic termination
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
new Object[] { t });
}
sendReplyWithShutdownException(anInputCasReferenceId);
handleAction(ErrorHandler.TERMINATE, null, null);
return;
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_executing_step__FINEST",
new Object[] { getComponentName(), aNewCasReferenceId, newCASProducedBy,
anInputCasReferenceId, });
}
// Continue with Steps. The CAS has been produced by the CAS Multiplier
executeFlowStep(flow, aNewCasReferenceId, true);
} catch (Exception e) {
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
handleError(map, e);
}
}
private void sendReplyWithShutdownException(String aCasReferenceId) {
try {
CasStateEntry casStateEntry = localCache.createCasStateEntry(aCasReferenceId);
CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
Endpoint replyEndpoint = getReplyEndpoint(cacheEntry, casStateEntry);
if (replyEndpoint != null) {
getOutputChannel().sendReply(new ServiceShutdownException(), aCasReferenceId, null,
replyEndpoint, AsynchAEMessage.Process);
}
} catch (Exception ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
new Object[] { ex });
}
}
}
private boolean abortProcessingCas(CasStateEntry casStateEntry, CacheEntry entry) {
CasStateEntry parentCasStateEntry = null;
try {
// Check if this CAS has a parent
if (casStateEntry.isSubordinate()) {
// Fetch parent's cache entry
parentCasStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId());
// Check the state of the parent CAS. If it is marked as failed, it means that
// one of its child CASes failed and error handling was configured to fail the
// CAS. Such failure of a child CAS causes a failure of the parent CAS. All child
// CASes will be dropped in finalStep() as they come back from delegates. When all are
// accounted for and dropped, the parent CAS will be returned back to the client
// with an exception.
if (parentCasStateEntry.isFailed()) {
// Fetch Delegate object for the CM that produced the CAS. The producer key
// is associated with a cache entry in the ProcessRequestHandler. Each new CAS
// must have a key of a CM that produced it.
Delegate delegateCM = lookupDelegate(entry.getCasProducerKey());
if (delegateCM != null && delegateCM.getEndpoint().isCasMultiplier()) {
// If the delegate CM is a remote, send a Free CAS notification
if (delegateCM.getEndpoint().isRemote()) {
parentCasStateEntry.getFreeCasNotificationEndpoint().setCommand(AsynchAEMessage.Stop);
getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(),
parentCasStateEntry.getFreeCasNotificationEndpoint());
}
// Check if a request to stop generation of new CASes from the parent of
// this CAS has been sent to the CM. The Delegate object keeps track of
// requests to STOP that are sent to the CM. Only one STOP is needed.
if (delegateCM.isGeneratingChildrenFrom(parentCasStateEntry.getCasReferenceId())) {
// Issue a request to the CM to stop producing new CASes from a given input
// CAS
stopCasMultiplier(delegateCM, parentCasStateEntry.getCasReferenceId());
}
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"abortProcessingCas",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_forcing_cas_to_finalstep__FINE",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
casStateEntry.getSubordinateCasInPlayCount() });
}
casStateEntry.setReplyReceived();
// Force the CAS to go to the Final Step where it will be dropped
finalStep(new FinalStep(), casStateEntry.getCasReferenceId());
return true; // Done here
}
} else if (casStateEntry.isFailed()) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"abortProcessingCas",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_forcing_cas_to_finalstep__FINE",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
casStateEntry.getSubordinateCasInPlayCount() });
}
casStateEntry.setReplyReceived();
// move this CAS to the final step
finalStep(new FinalStep(), casStateEntry.getCasReferenceId());
return true;
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"abortProcessingCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
return false;
}
/**
* This is a process method that is executed for CASes not created by a Multiplier in this
* aggregate.
*
*/
public void process(CAS aCAS, String aCasReferenceId) {
boolean handlingDelayedStep = false;
// First check if there are outstanding steps to be called before consulting the Flow
// Controller.
// This could be the case if a previous step was a parallel step and it contained collocated
// delegates.
if (!isStopped()) {
if (abortGeneratingCASes(aCasReferenceId)) {
// Force delegate Cas Multipliers to Stop generating new CASes
super.stopCasMultipliers();
}
try {
CacheEntry entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
// Check if this CAS should be aborted due to previous error on this CAS or its
// parent. If this is the case the method will move the CAS to the final state
// where it will be dropped. If the CAS is an input CAS, it will be returned to
// the client with an exception
if (abortProcessingCas(casStateEntry, entry)) {
// This CAS was aborted, we are done here
return;
}
// Check if this is an input CAS from the client. If not, check if last
// delegate handling this CAS was a Cas Multiplier configured to process
// parent CAS last
if (casStateEntry.getLastDelegate() != null) {
// Fetch the endpoint corresponding to the last Delegate handling the CAS
Endpoint lastDelegateEndpoint = casStateEntry.getLastDelegate().getEndpoint();
// Check if this delegate is a Cas Multiplier and the parent CAS is to be processed last
casStateEntry.setReplyReceived();
if (lastDelegateEndpoint.isCasMultiplier() && lastDelegateEndpoint.processParentLast()) {
synchronized (super.finalStepMux) {
// Determine if the CAS should be held until all its children leave this aggregate.
if (casStateEntry.getSubordinateCasInPlayCount() > 0) {
// This input CAS has child CASes still in play. It will remain in the cache
// until the last of the child CASes is released. Only than, the input CAS is
// is allowed to continue into the next step in the flow.
// The CAS has to be in final state
casStateEntry.setState(CacheEntry.FINAL_STATE);
// The input CAS will be interned until all children leave this aggregate
return;
}
}
}
}
// if we are here entry is not null. The above throws an exception if an entry is not
// found in the cache. First check if there is a delayedSingleStepList in the cache.
// If there is one, it means that a parallel step contained collocated delegate(s)
// The parallel step may only contain remote delegates. All collocated delegates
// were removed from the parallel step and added to the delayedSingleStepList in
// parallelStep() method.
List delayedSingleStepList = entry.getDelayedSingleStepList();
if (delayedSingleStepList != null && delayedSingleStepList.size() > 0) {
handlingDelayedStep = true;
// Reset number of parallel delegates back to one. This is done only if the previous step
// was a parallel step.
synchronized (parallelStepMux) {
if (casStateEntry.getNumberOfParallelDelegates() > 1) {
casStateEntry.setNumberOfParallelDelegates(1);
}
}
// Remove a delegate endpoint from the single step list cached in the CAS entry
Endpoint endpoint = (Endpoint_impl) entry.getDelayedSingleStepList().remove(0);
// send the CAS to a collocated delegate from the delayed single step list.
dispatchProcessRequest(aCasReferenceId, endpoint, true);
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
new Object[] { e });
}
} finally {
// If just handled the delayed step, return as there is nothing else to do
if (handlingDelayedStep) {
return;
}
}
}
FlowContainer flow = null;
try {
if (aCasReferenceId != null) {
try {
// Check if a Flow object has been previously generated for the Cas.
if (flowMap.containsKey(aCasReferenceId)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_retrieve_flow_object__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
flow = (FlowContainer) flowMap.get(aCasReferenceId);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_retrieved_flow_object_ok__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_new_flow_object__FINEST", new Object[] { aCasReferenceId });
}
synchronized (flowControllerContainer) {
flow = flowControllerContainer.computeFlow(aCAS);
}
// Save the Flow Object in a cache. Flow exists in the cache
// until the CAS is fully processed or it is
// explicitly deleted when processing of this CAS cannot
// continue
flowMap.put(aCasReferenceId, flow);
// Check if the local cache already contains an entry for the Cas id.
// A colocated Cas Multiplier may have already registered this CAS
// in the parent's controller
if (localCache.lookupEntry(aCasReferenceId) == null) {
// Add this Cas Id to the local cache. Every input CAS goes through here
localCache.createCasStateEntry(aCasReferenceId);
}
}
} catch (Exception ex) {
// Any error here is automatic termination
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { ex });
}
sendReplyWithShutdownException(aCasReferenceId);
handleAction(ErrorHandler.TERMINATE, null, null);
return;
}
if (!isStopped()) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_executing_step_input_cas__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
// Execute a step in the flow. false means that this CAS has not
// been produced by CAS Multiplier
executeFlowStep(flow, aCasReferenceId, false);
} else {
synchronized (flowControllerContainer) {
flow.aborted();
}
}
}
} catch (Exception e) {
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, aCasReferenceId);
handleError(map, e);
}
}
private void simpleStep(SimpleStep aStep, String aCasReferenceId)// throws AsynchAEException
{
Endpoint endpoint = null;
try {
String analysisEngineKey = aStep.getAnalysisEngineKey();
// Find the endpoint for the delegate
endpoint = lookUpEndpoint(analysisEngineKey, true);
if (endpoint != null) {
endpoint.setController(this);
CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
if (endpoint.isCasMultiplier()) {
Delegate delegateCM = lookupDelegate(analysisEngineKey);
delegateCM.setGeneratingChildrenFrom(aCasReferenceId, true);
// Record the outgoing CAS. CASes destined for remote CM are recorded
// in JmsOutputchannel.
if (!endpoint.isRemote()) {
delegateCM.addNewCasToOutstandingList(aCasReferenceId, true);
}
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"simpleStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_next_step__FINEST", new Object[] { analysisEngineKey, aCasReferenceId });
}
// Reset number of parallel delegates back to one. This is done only if the previous step
// was a parallel step.
synchronized (parallelStepMux) {
if (casStateEntry.getNumberOfParallelDelegates() > 1) {
casStateEntry.setNumberOfParallelDelegates(1);
}
}
if (!isStopped()) {
// Start a timer for this request. The amount of time to wait
// for response is provided in configuration for this endpoint
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"simpleStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_next_step_dispatch__FINEST",
new Object[] { getComponentName(), aCasReferenceId, analysisEngineKey });
}
dispatchProcessRequest(aCasReferenceId, endpoint, true);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"simpleStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_next_step_dispatch_completed__FINEST",
new Object[] { getComponentName(), aCasReferenceId, analysisEngineKey });
}
}
}
} catch (Throwable e) {
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, aCasReferenceId);
if (endpoint != null) {
map.put(AsynchAEMessage.Endpoint, endpoint);
}
handleError(map, e);
}
}
private void parallelStep(ParallelStep aStep, String aCasReferenceId) throws AsynchAEException {
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "parallelStep",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_parallel_step__FINE",
new Object[] { getComponentName(), aCasReferenceId });
}
Collection keyList = aStep.getAnalysisEngineKeys();
String[] analysisEngineKeys = new String[keyList.size()];
keyList.toArray(analysisEngineKeys);
List parallelDelegateList = new ArrayList();
List singleStepDelegateList = null;
// Only remote delegates can be in a parallel step. Iterate over the
// delegates in parallel step and assign each to a different list based on location.
// Remote delegates are assigned to parallelDelegateList, whereas co-located
// delegates are assigned to singleStepDelegateList. Those delegates
// assigned to the singleStepDelegateList will be executed sequentially
// once all parallel delegates respond.
for (int i = 0; i < analysisEngineKeys.length; i++) {
// Fetch an endpoint corresponding to a given delegate key
Endpoint endpoint = lookUpEndpoint(analysisEngineKeys[i], true);
endpoint.setController(this);
// Assign delegate to appropriate list
if (endpoint.isRemote()) {
parallelDelegateList.add(endpoint);
} else {
if (singleStepDelegateList == null) {
singleStepDelegateList = new ArrayList();
}
singleStepDelegateList.add(endpoint);
if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"parallelStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_move_to_single_step_list__FINE",
new Object[] { getComponentName(), analysisEngineKeys[i], aCasReferenceId });
}
}
}
// Fetch cache entry for a given CAS id
CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
// Add all co-located delegates to the cache. These delegates will be called
// sequentially once all parallel delegates respond
if (singleStepDelegateList != null) {
// Add a list containing single step delegates to the cache
// These delegates will be called sequentially when all parallel
// delegates respond.
cacheEntry.setDelayedSingleStepList(singleStepDelegateList);
}
// Check if there are any delegates in the parallel step. It is possible that
// all of the delegates were co-located and thus the parallel delegate list
// is empty.
if (parallelDelegateList.size() > 0) {
// Create endpoint array to contain as many slots as there are parallel delegates
Endpoint[] endpoints = new Endpoint_impl[parallelDelegateList.size()];
// Copy parallel delegate endpoints to the array
parallelDelegateList.toArray(endpoints);
synchronized (parallelStepMux) {
casStateEntry.resetDelegateResponded();
// Set number of delegates in the parallel step
casStateEntry.setNumberOfParallelDelegates(endpoints.length);
}
// Dispatch CAS to remote parallel delegates
dispatchProcessRequest(aCasReferenceId, endpoints, true);
} else {
// All delegates in a parallel step are co-located. Send the CAS
// to the first delegate in the single step list.
process(null, aCasReferenceId);
}
} catch (Throwable e) {
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, aCasReferenceId);
handleError(map, e);
}
}
public void sendRequestForMetadataToRemoteDelegates() throws AsynchAEException {
synchronized(childControllerList) {
if ( childControllerList.size() > 0 ) {
for( AnalysisEngineController childController : childControllerList ) {
if (childController instanceof AggregateAnalysisEngineController) {
((AggregateAnalysisEngineController) childController)
.sendRequestForMetadataToRemoteDelegates();
}
}
}
}
Endpoint[] delegateEndpoints = new Endpoint[destinationMap.size()];
// First copy endpoints to an array so that we dont get Concurrent access problems
// in case an error handler needs to disable the endpoint.
Set keySet = destinationMap.keySet();
Iterator it = keySet.iterator();
int indx = 0;
while (it.hasNext()) {
delegateEndpoints[indx++] = (Endpoint) destinationMap.get((String) it.next());
}
// Now send GetMeta request to all remote delegates
for (int i = 0; !isStopped() && i < delegateEndpoints.length; i++) {
if (delegateEndpoints[i].isRemote()) {
delegateEndpoints[i].initialize();
delegateEndpoints[i].setController(this);
String key = lookUpDelegateKey(delegateEndpoints[i].getEndpoint());
if (key != null && destinationMap.containsKey(key)) {
Endpoint endpoint = ((Endpoint) destinationMap.get(key));
if (key != null && endpoint != null) {
ServiceInfo serviceInfo = endpoint.getServiceInfo();
PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo(endpoint.isCasMultiplier(), null);
pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL());
pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
if (endpoint.getDestination() != null) {
pServiceInfo.setReplyQueueName(endpoint.getDestination().toString());
}
pServiceInfo.setServiceKey(key);
pServiceInfo.setState(serviceInfo.getState());
pServiceInfo.setAnalysisEngineInstanceCount(1);
registerWithAgent(pServiceInfo, super.getManagementInterface().getJmxDomain()
+ super.jmxContext + ",r" + remoteIndex + "=" + key
+ " [Remote Uima EE Service],name=" + key + "_" + serviceInfo.getLabel());
ServicePerformance servicePerformance = new ServicePerformance();
// servicePerformance.setIdleTime(System.nanoTime());
servicePerformance.setRemoteDelegate();
registerWithAgent(servicePerformance, super.getManagementInterface().getJmxDomain()
+ super.jmxContext + ",r" + remoteIndex + "=" + key
+ " [Remote Uima EE Service],name=" + key + "_" + servicePerformance.getLabel());
ServiceErrors serviceErrors = new ServiceErrors();
registerWithAgent(serviceErrors, super.getManagementInterface().getJmxDomain()
+ super.jmxContext + ",r" + remoteIndex + "=" + key
+ " [Remote Uima EE Service],name=" + key + "_" + serviceErrors.getLabel());
remoteIndex++;
serviceErrorMap.put(key, serviceErrors);
Object[] delegateStatsArray = new Object[] { pServiceInfo, servicePerformance,
serviceErrors };
delegateStatMap.put(key, delegateStatsArray);
}
// If the service has stopped dont bother doing anything else. The service
// may have been stopped because listener connection could not be established.
if (isStopped()) {
return;
}
if (delegateEndpoints[i].getStatus() == Endpoint.OK ) {
dispatchMetadataRequest(delegateEndpoints[i]);
}
}
}
else {
// collocated delegate
delegateEndpoints[i].initialize();
delegateEndpoints[i].setController(this);
delegateEndpoints[i].setWaitingForResponse(true);
try {
UimaMessage message = getTransport(delegateEndpoints[i].getEndpoint()).produceMessage(
AsynchAEMessage.GetMeta, AsynchAEMessage.Request, getName());
UimaTransport transport = getTransport(delegateEndpoints[i].getEndpoint());
transport.getUimaMessageDispatcher(delegateEndpoints[i].getEndpoint()).dispatch(message);
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
}
}
private CasStateEntry fetchParentCasFromLocalCache(CasStateEntry casStateEntry) throws Exception {
// Lookup parent CAS in the local cache
CasStateEntry parentCasStateEntry = localCache.lookupEntry(casStateEntry
.getInputCasReferenceId());
if (parentCasStateEntry == null) {
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME)
.logrb(
Level.INFO,
CLASS_NAME.getName(),
"fetchParentCas",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_not_found__INFO",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
"Local Cache" });
}
}
return parentCasStateEntry;
}
private CacheEntry fetchParentCasFromGlobalCache(CasStateEntry casStateEntry) throws Exception {
CacheEntry parentCASCacheEntry = null;
try {
// Fetch the parent Cas cache entry
parentCASCacheEntry = getInProcessCache().getCacheEntryForCAS(
casStateEntry.getInputCasReferenceId());
} catch (Exception ex) {
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"fetchParentCas",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_not_found__INFO",
new Object[] { getComponentName(), casStateEntry.getInputCasReferenceId(),
"InProcessCache" });
}
}
return parentCASCacheEntry;
}
private boolean casHasChildrenInPlay(CasStateEntry casStateEntry) throws Exception {
if (casStateEntry.getSubordinateCasInPlayCount() > 0) {
// This CAS has child CASes still in play. This CAS will remain in the cache
// until all its children are fully processed.
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"casHasChildrenInPlay",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_final_step_parent_cas_child_count__FINEST",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
casStateEntry.getSubordinateCasInPlayCount() });
}
// Leave input CAS in pending state. It will be returned to the client
// *only* if the last subordinate CAS is fully processed.
casStateEntry.setPendingReply(true);
// Done here. There are subordinate CASes still being processed.
return true;
}
return false;
}
public void finalStep(FinalStep aStep, String aCasReferenceId) {
Endpoint endpoint = null;
boolean replySentToClient = false;
boolean isSubordinate = false;
CacheEntry cacheEntry = null;
CasStateEntry casStateEntry = null;
CasStateEntry parentCasStateEntry = null;
Endpoint freeCasEndpoint = null;
CacheEntry parentCASCacheEntry = null;
Endpoint cEndpoint = null;
boolean casDropped = false;
boolean doDecrementChildCount = false;
localCache.dumpContents();
// First locate entries in the global and local cache for a given CAS
// If not found, log a message and return
try {
// Get entry from the cache for a given CAS Id. This throws an exception if
// an entry doesnt exist in the cache
cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
casStateEntry = localCache.lookupEntry(aCasReferenceId);
if (casStateEntry.getState() != CacheEntry.FINAL_STATE) {
// Mark the entry to indicate that the CAS reached a final step. This CAS
// may still have children and will not be returned to the client until
// all of them are fully processed. This state info will aid in the
// internal bookkeeping when the final child is processed.
casStateEntry.setFinalStep(aStep);
casStateEntry.setState(CacheEntry.FINAL_STATE);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"finalStep",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_in_finalstep__FINE",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
casStateEntry.getSubordinateCasInPlayCount() });
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "finalStep",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
new Object[] { e });
}
return;
}
// Found entries in caches for a given CAS id
try {
endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
synchronized (super.finalStepMux) {
// Check if the global cache still contains the CAS. It may have been deleted by another
// thread already
if (!getInProcessCache().entryExists(aCasReferenceId)) {
return;
}
// Check if this CAS has children that are still being processed in this aggregate
if (casHasChildrenInPlay(casStateEntry)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"finalStep",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_has_children__FINE",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
casStateEntry.getCasReferenceId(),
casStateEntry.getSubordinateCasInPlayCount() });
}
replySentToClient = false;
return;
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_final_step_parent_cas_no_children__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
// Determine if this CAS is a child of some CAS
isSubordinate = casStateEntry.getInputCasReferenceId() != null;
if (isSubordinate) {
// fetch the destination of a CM that produced this CAS, so that we know where to send
// Free Cas Notification
freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
parentCasStateEntry = fetchParentCasFromLocalCache(casStateEntry);
parentCASCacheEntry = fetchParentCasFromGlobalCache(casStateEntry);
doDecrementChildCount = true;
}
// If the CAS was generated by this component but the Flow Controller wants to drop it OR
// this component
// is not a Cas Multiplier
if (forceToDropTheCas(parentCasStateEntry, cacheEntry, aStep)) {
if (casStateEntry.isReplyReceived()) {
if (isSubordinate) {
// drop the flow since we no longer need it
dropFlow(aCasReferenceId, true);
// Drop the CAS and remove cache entry for it
dropCAS(aCasReferenceId, true);
casDropped = true;
// If debug level=FINEST dump the entire cache
localCache.dumpContents();
// Set this state as if we sent the reply to the client. This triggers a cleanup of
// origin map and stats
// for the current cas
if (isTopLevelComponent()) {
replySentToClient = true;
}
}
} else {
doDecrementChildCount = false;
}
} else if (!casStateEntry.isDropped()) {
casStateEntry.setWaitingForRelease(true);
// Send a reply to the Client. If the CAS is an input CAS it will be dropped
cEndpoint = replyToClient(cacheEntry, casStateEntry);
replySentToClient = true;
if (cEndpoint.isRemote()) {
// if this service is a Cas Multiplier don't remove the CAS. It will be removed
// when a remote client sends explicit Release CAS Request
if (!isCasMultiplier()) {
// Drop the CAS and remove cache entry for it
dropCAS(aCasReferenceId, true);
}
casDropped = true;
} else {
// Remove entry from the local cache for this CAS. If the client
// is remote the entry was removed in replyToClient()
try {
localCache.lookupEntry(aCasReferenceId).setDropped(true);
} catch (Exception e) {
}
localCache.remove(aCasReferenceId);
}
// If debug level=FINEST dump the entire cache
localCache.dumpContents();
}
if (parentCasStateEntry == null && isSubordinate) {
parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
}
if (doDecrementChildCount) {
// Child CAS has been fully processed, decrement its parent count of active child CASes
if (parentCasStateEntry != null) {
parentCasStateEntry.decrementSubordinateCasInPlayCount();
// If debug level=FINEST dump the entire cache
localCache.dumpContents();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"finalStep",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_decremented_child_count__FINE",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
casStateEntry.getSubordinateCasInPlayCount() });
}
}
}
boolean clientIsCollocated = (cEndpoint == null || !cEndpoint.isRemote());
if (parentCasStateEntry != null && parentCasStateEntry.getSubordinateCasInPlayCount() == 0
&& parentCasStateEntry.isFailed()) {
parentCasStateEntry.setReplyReceived();
}
// For subordinate CAS, check if its parent needs to be put in play. This should happen if
// this CAS was the last of the children in play
if (isSubordinate && releaseParentCas(casDropped, clientIsCollocated, parentCasStateEntry)) {
// Put the parent CAS in play. The parent CAS can be in one of two places now depending
// on the configuration. The parent CAS could have been suspended in the final step, or it
// could have
// been suspended in the process method. If the configuration indicates that the parent
// should follow only when the last of its children leaves this aggregate, call the
// process method.
// Otherwise, the CAS is in a final state and simply needs to resume there.
Endpoint lastDelegateEndpoint = casStateEntry.getLastDelegate().getEndpoint();
if (lastDelegateEndpoint.processParentLast()) {
// The parent was suspended in the process call. Resume processing the CAS
process(parentCASCacheEntry.getCas(), parentCasStateEntry.getCasReferenceId());
} else {
// The parent was suspended in the final step. Resume processing the CAS
finalStep(parentCasStateEntry.getFinalStep(), parentCasStateEntry.getCasReferenceId());
}
}
} // synchronized
if (endpoint != null) {
// remove stats associated with this Cas and a given endpoint
dropStats(aCasReferenceId, endpoint.getEndpoint());
}
} catch (Exception e) {
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, aCasReferenceId);
if (endpoint != null) {
map.put(AsynchAEMessage.Endpoint, endpoint);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
handleError(map, e);
} finally {
if (replySentToClient) {
removeMessageOrigin(aCasReferenceId);
dropStats(aCasReferenceId, super.getName());
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"finalStep",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_final_step_show_internal_stats__FINEST",
new Object[] { getName(), flowMap.size(), getInProcessCache().getSize(),
originMap.size(), super.statsMap.size() });
}
// freeCasEndpoint is a special endpoint for sending Free CAS Notification.
if (casDropped && freeCasEndpoint != null) {
freeCasEndpoint.setReplyEndpoint(true);
try {
// send Free CAS Notification to a Cas Multiplier
getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId,
freeCasEndpoint);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
}
}
}
public boolean releaseParentCas(boolean casDropped, boolean clientIsCollocated,
CasStateEntry parentCasStateEntry) {
if (parentCasStateEntry == null) {
return false;
}
// To release the parent CAS, the following conditions must be true
boolean retValue =
// The child CAS was dropped OR this Aggregate Client is colocated
(casDropped || clientIsCollocated)
// The parent CAS has been received by this Aggregate
&& parentCasStateEntry.isReplyReceived()
// The CAS has to be in final state
&& parentCasStateEntry.getState() == CacheEntry.FINAL_STATE
// To release the CAS, it may not have any children (subordinate CASes)
&& parentCasStateEntry.getSubordinateCasInPlayCount() == 0;
if (clientEndpoint != null && UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"finalStep",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_show_why_not_releasing_parent__FINEST",
new Object[] { getComponentName(), parentCasStateEntry.getCasReferenceId(), retValue,
casDropped, clientEndpoint.isRemote(), parentCasStateEntry.isReplyReceived(),
parentCasStateEntry.isPendingReply(),
parentCasStateEntry.getState() == CacheEntry.FINAL_STATE,
parentCasStateEntry.getSubordinateCasInPlayCount(), getComponentName() });
}
return retValue;
}
private boolean forceToDropTheCas(CasStateEntry entry, CacheEntry cacheEntry, FinalStep aStep) {
// Get the key of the Cas Producer
String casProducer = cacheEntry.getCasProducerAggregateName();
// CAS is considered new from the point of view of this service IF it was produced by it
boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(
casProducer));
if (entry != null && entry.isFailed() && isNewCas) {
return true; // no point to continue if the CAS was produced in this aggregate and its parent
// failed here
}
// If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR
// this component
// is not a Cas Multiplier
if (isNewCas && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
return true;
}
return false;
}
private boolean casHasExceptions(CasStateEntry casStateEntry) {
return (casStateEntry.getErrors().size() > 0) ? true : false;
}
private void sendReplyWithException(CacheEntry acacheEntry, CasStateEntry casStateEntry,
Endpoint replyEndpoint) throws Exception {
// boolean casProducedInThisAggregate =
// getComponentName().equals(cacheEntry.getCasProducerAggregateName());
if (casStateEntry.isSubordinate()) {
// We must reply with the input CAS
// casStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId());
casStateEntry = getLocalCache().getTopCasAncestor(casStateEntry.getCasReferenceId());
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendReplyWithException",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_returning_exception_to_client__FINE",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
replyEndpoint.getEndpoint() });
}
if (replyEndpoint.isRemote()) {
// this is an input CAS that has been marked as failed. Return the input CAS
// and an exception to the client.
getOutputChannel().sendReply(casStateEntry.getErrors().get(0),
casStateEntry.getCasReferenceId(), null, replyEndpoint, AsynchAEMessage.Process);
} else {
replyEndpoint.setReplyEndpoint(true);
UimaTransport vmTransport = getTransport(replyEndpoint.getEndpoint());
UimaMessage message = vmTransport.produceMessage(AsynchAEMessage.Process,
AsynchAEMessage.Response, this.getName());
message.addIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
message.addStringProperty(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
Throwable wrapper = null;
Throwable cause = casStateEntry.getErrors().get(0);
if (!(cause instanceof UimaEEServiceException)) {
// Strip off AsyncAEException and replace with UimaEEServiceException
if (cause instanceof AsynchAEException && cause.getCause() != null) {
wrapper = new UimaEEServiceException(cause.getCause());
} else {
wrapper = new UimaEEServiceException(cause);
}
}
if (wrapper == null) {
message.addObjectProperty(AsynchAEMessage.Cargo, cause);
} else {
message.addObjectProperty(AsynchAEMessage.Cargo, wrapper);
}
vmTransport.getUimaMessageDispatcher(replyEndpoint.getEndpoint()).dispatch(message);
}
}
private boolean sendExceptionToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry,
Endpoint replyEndpoint) throws Exception {
// Dont send CASes to the client if the input CAS is in failed state. One
// of the descendant CASes may have failed in one of the delegates. Any
// exception on descendant CAS causes the input CAS to be returned to the
// client with an exception but only when all its descendant CASes are
// accounted for and released.
if (casStateEntry.isSubordinate()) {
// Fetch the top ancestor CAS of this CAS.
CasStateEntry topAncestorCasStateEntry = getLocalCache().getTopCasAncestor(
casStateEntry.getInputCasReferenceId());
// check the state
if (topAncestorCasStateEntry.isFailed() && casHasExceptions(casStateEntry)
&& topAncestorCasStateEntry.getSubordinateCasInPlayCount() == 0) {
return true;
} else {
// Add the id of the generated CAS to the map holding outstanding CASes. This
// map will be referenced when a client sends Free CAS Notification. The map
// stores the id of the CAS both as a key and a value. Map is used to facilitate
// quick lookup
cmOutstandingCASes
.put(casStateEntry.getCasReferenceId(), casStateEntry.getCasReferenceId());
}
} else if (casStateEntry.isFailed() && casHasExceptions(casStateEntry)) {
return true;
}
return false;
}
private void sendReplyToRemoteClient(CacheEntry cacheEntry, CasStateEntry casStateEntry,
Endpoint replyEndpoint) throws Exception {
if (sendExceptionToClient(cacheEntry, casStateEntry, replyEndpoint)) {
sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint);
} else {
// Send response to a given endpoint
getOutputChannel().sendReply(cacheEntry, replyEndpoint);
// Drop the CAS only if the client is remote and the CAS is an input CAS.
// If this CAS has a parent the client will send Release CAS notification to release the CAS.
if (!casStateEntry.isSubordinate()) {
dropCAS(casStateEntry.getCasReferenceId(), true);
// If the cache is empty change the state of the Aggregate to idle
if (getInProcessCache().isEmpty()) {
endProcess(AsynchAEMessage.Process);
}
}
}
}
private void sendReplyToCollocatedClient(CacheEntry cacheEntry, CasStateEntry casStateEntry,
Endpoint replyEndpoint) throws Exception {
boolean casProducedInThisAggregate = getComponentName().equals(
cacheEntry.getCasProducerAggregateName());
boolean isSubordinate = casStateEntry.isSubordinate();
boolean serviceIsCM = isCasMultiplier();
if (sendExceptionToClient(cacheEntry, casStateEntry, replyEndpoint)) {
try {
sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint);
} catch (Exception e) {
} finally {
if (casProducedInThisAggregate) {
// Drop the CAS generated in this Aggregate
dropCAS(casStateEntry.getCasReferenceId(), true);
}
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendReplyToCollocatedClient",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_sending_reply_to_client__FINE",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
replyEndpoint.getEndpoint() });
}
int mType = AsynchAEMessage.Response;
// Check if the CAS was produced in this aggregate by any of its delegates
// If so, send the CAS as a request. Otherwise, the CAS is an input CAS and
// needs to return as reply.
if (isSubordinate && serviceIsCM && casProducedInThisAggregate) {
// this is a Cas Multiplier, send this CAS to the client in a request message.
mType = AsynchAEMessage.Request;
// Return the CAS to the colocated client. First make sure that this CAS
// is associated with the input CAS. This CAS may have been produced from
// an intermediate CAS (which was produced from the input CAS). From the
// client perspective, this Cas Multiplier Aggregate is a black box,
// all CASes produced here must be linked with the input CAS.
// Find the top ancestor of this CAS. It is the input CAS sent by the client
String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
// Modify the parent of this CAS.
if (inputCasId != null && !inputCasId.equals(casStateEntry.getInputCasReferenceId())) {
casStateEntry.setInputCasReferenceId(inputCasId);
cacheEntry.setInputCasReferenceId(inputCasId);
}
}
// Send CAS to a given reply endpoint
sendVMMessage(mType, replyEndpoint, cacheEntry);
}
}
private boolean validEndpoint(Endpoint endpoint, CasStateEntry casStateEntry) {
if (endpoint == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"validEndpoint", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_client_endpoint_not_found__INFO",
new Object[] { getComponentName(), casStateEntry.getCasReferenceId() });
}
return false;
}
if (endpoint.getEndpoint() == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"validEndpoint", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_no_reply_destination__INFO",
new Object[] { casStateEntry.getCasReferenceId() });
}
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
handleError(map, new UnknownDestinationException());
return false;
}
// Dont send a reply to the client if the client is a CAS multiplier
if (endpoint.isCasMultiplier()) {
return false;
}
return true;
}
private Endpoint replyToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry)
throws Exception {
Endpoint endpoint = getReplyEndpoint(cacheEntry, casStateEntry);
if (!validEndpoint(endpoint, casStateEntry)) {
return null; // the reason has already been logged
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"replyToClient",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_final_step__FINEST",
new Object[] { casStateEntry.getCasReferenceId(),
(double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 });
}
endpoint.setFinal(true);
if (!isStopped()) {
try {
if (endpoint.isRemote()) {
sendReplyToRemoteClient(cacheEntry, casStateEntry, endpoint);
} else {
sendReplyToCollocatedClient(cacheEntry, casStateEntry, endpoint);
}
} catch ( Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
return endpoint;
}
private void sendVMMessage(int messageType, Endpoint endpoint, CacheEntry cacheEntry)
throws Exception {
// If the CAS was produced by this aggregate send the request message to the client
// Otherwise send the response message.
UimaTransport transport = getTransport(endpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.Process, messageType, getName());
if (cacheEntry.getCasProducerAggregateName() != null
&& cacheEntry.getCasProducerAggregateName().equals(getComponentName())) {
message.addLongProperty(AsynchAEMessage.CasSequence, cacheEntry.getCasSequence());
}
message.addStringProperty(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId());
if (cacheEntry.getInputCasReferenceId() != null) {
message.addStringProperty(AsynchAEMessage.InputCasReference, cacheEntry
.getInputCasReferenceId());
}
ServicePerformance casStats = getCasStatistics(cacheEntry.getCasReferenceId());
message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
.getRawCasSerializationTime());
message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
.getRawCasDeserializationTime());
message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
message.addLongProperty(AsynchAEMessage.IdleTime, iT);
// Send reply back to the client. Use internal (non-jms) transport
transport.getUimaMessageDispatcher(endpoint.getEndpoint()).dispatch(message);
}
private Endpoint getReplyEndpoint(CacheEntry cacheEntry, CasStateEntry casStateEntry)
throws Exception {
Endpoint endpoint = null;
// Get the endpoint that represents a client that send the request
// to this service. If the first arg to getEndpoint() is null, the method
// should return the origin.
if (isTopLevelComponent()) {
if (casStateEntry.isSubordinate()) {
endpoint = getInProcessCache().getTopAncestorEndpoint(cacheEntry);
} else {
endpoint = getInProcessCache().getEndpoint(null, casStateEntry.getCasReferenceId());
}
} else {
endpoint = getReplyEndpoint(cacheEntry);
dropFlow(casStateEntry.getCasReferenceId(), false);
}
return endpoint;
}
private Endpoint getReplyEndpoint(CacheEntry cacheEntry) throws Exception {
if (cacheEntry == null) {
return null;
}
Endpoint endpoint = getMessageOrigin(cacheEntry.getCasReferenceId());
if (endpoint == null && cacheEntry.getInputCasReferenceId() != null) {
// Recursively call self until an endpoint is found
endpoint = getReplyEndpoint(getInProcessCache().getCacheEntryForCAS(
cacheEntry.getInputCasReferenceId()));
}
return endpoint;
}
private void executeFlowStep(FlowContainer aFlow, String aCasReferenceId, boolean newCAS)
throws AsynchAEException {
Step step = null;
try {
// Guard a call to next(). Allow one thread and block the rest
synchronized( flowControllerContainer ) {
step = aFlow.next();
}
} catch (Exception e) {
// Any error here is automatic termination
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
try {
sendReplyWithShutdownException(aCasReferenceId);
// getInProcessCache().destroy();
ErrorContext ec = new ErrorContext();
ec.add(ErrorContext.THROWABLE_ERROR, e);
ec.add(AsynchAEMessage.CasReference, aCasReferenceId);
handleAction(ErrorHandler.TERMINATE, null, ec);
} catch (Exception ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
return;
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_step__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
try {
if (step instanceof SimpleStep) {
simpleStep((SimpleStep) step, aCasReferenceId);
} else if (step instanceof ParallelStep) {
parallelStep((ParallelStep) step, aCasReferenceId);
} else if (step instanceof FinalStep) {
// Special case: check if this CAS has just been produced by a Cas Multiplier.
// If so, we received a new CAS but there are no delegates in the pipeline.
// The CM was the last in the flow. In this case, set a property in the cache
// to simulate receipt of the reply to this CAS. This is so that the CAS is
// released in the finalStep() when the Aggregate is not a Cas Multiplier.
if (newCAS) {
CasStateEntry casStateEntry = localCache.lookupEntry(aCasReferenceId);
if (casStateEntry != null) {
casStateEntry.setReplyReceived();
}
}
finalStep((FinalStep) step, aCasReferenceId);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_completed_step__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
} catch (Exception e) {
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, aCasReferenceId);
handleError(map, e);
}
}
private void dispatch(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
if (!anEndpoint.isRemote()) {
try {
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
AsynchAEMessage.Request, getName());
message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"dispatch", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
} else {
// Check delegate's state before sending it a CAS. The delegate
// may have previously timed out and is in a process of pinging
// the delegate to check its availability. While the delegate
// is in this state, delay CASes by placing them on a list of
// CASes pending dispatch. Once the ping reply is received all
// delayed CASes will be dispatched to the delegate.
if (!delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpoint.getDelegateKey())) {
// The delegate is in the normal state so send it this CAS
getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
}
}
}
/**
* Checks the state of a delegate to see if it is in TIMEOUT State. If it is, push the CAS id onto
* a list of CASes pending dispatch. The delegate is in a questionable state and the aggregate
* sends a ping message to check delegate's availability. If the delegate responds to the ping,
* all CASes in the pending dispatch list will be immediately dispatched.
**/
public boolean delayCasIfDelegateInTimedOutState(String aCasReferenceId, String aDelegateKey)
throws AsynchAEException {
Delegate delegate = lookupDelegate(aDelegateKey);
if (delegate != null && delegate.getState() == Delegate.TIMEOUT_STATE) {
// Add CAS id to the list of delayed CASes.
int listSize = delegate.addCasToPendingDispatchList(aCasReferenceId);
// If the list was empty (before the add), send the GetMeta request
// as a PING to see if the delegate service is alive.
if (listSize == 1) {
delegate.setAwaitingPingReply();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"delayCasIfDelegateInTimedOutState", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_aggregate_sending_ping__INFO",
new Object[] { getComponentName(), delegate.getKey() });
}
retryMetadataRequest(delegate.getEndpoint());
}
return true;
}
return false; // Cas Not Delayed
}
private void dispatchProcessRequest(String aCasReferenceId, Endpoint anEndpoint,
boolean addEndpointToCache) throws AsynchAEException {
if (addEndpointToCache) {
getInProcessCache().addEndpoint(anEndpoint, aCasReferenceId);
}
anEndpoint.setController(this);
dispatch(aCasReferenceId, anEndpoint);
}
public void retryProcessCASRequest(String aCasReferenceId, Endpoint anEndpoint,
boolean addEndpointToCache) throws AsynchAEException {
Endpoint endpoint = null;
String key = lookUpDelegateKey(anEndpoint.getEndpoint());
if ((endpoint = getInProcessCache().getEndpoint(anEndpoint.getEndpoint(), aCasReferenceId)) != null) {
Endpoint masterEndpoint = lookUpEndpoint(key, true);
// check if the master endpoint destination has changed. This can be a case when
// a new temp queue is created when the previous temp queue is destroyed due to
// a broken connection.
if (masterEndpoint.getDestination() != null) {
// Make sure that we use the current destination for replies
if (!masterEndpoint.getDestination().toString()
.equals(endpoint.getDestination().toString())) {
// Override the endopoint reply-to destination with the master destination
endpoint.setDestination(masterEndpoint.getDestination());
}
}
} else {
endpoint = anEndpoint;
endpoint = lookUpEndpoint(key, true);
getInProcessCache().addEndpoint(endpoint, aCasReferenceId);
}
dispatchProcessRequest(aCasReferenceId, endpoint, addEndpointToCache);
}
private void dispatchProcessRequest(String aCasReferenceId, Endpoint[] anEndpointList,
boolean addEndpointToCache) throws AsynchAEException {
List<Endpoint> endpointList = new ArrayList<Endpoint>();
for (int i = 0; i < anEndpointList.length; i++) {
// Check if the delegate previously timed out. If so, add the CAS
// Id to the list pending dispatch. This list holds CASes that are
// delayed until the service responds to a Ping.
if (delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpointList[i].getEndpoint())) {
// The CAS was delayed until the delegate responds to a Ping
continue;
} else {
endpointList.add(anEndpointList[i]);
}
if (addEndpointToCache) {
getInProcessCache().addEndpoint(anEndpointList[i], aCasReferenceId);
}
}
Endpoint[] endpoints = new Endpoint[endpointList.size()];
endpointList.toArray(endpoints);
getOutputChannel().sendRequest(aCasReferenceId, endpoints);
}
public boolean isDelegateKeyValid(String aDelegateKey) {
if (destinationMap.containsKey(aDelegateKey)) {
return true;
}
return false;
}
public String lookUpDelegateKey(String anEndpointName) {
return lookUpDelegateKey(anEndpointName, null);
}
/**
* Returns a delegate key given an endpoint (queue) name and a server uri. If a server is null,
* only the endpoint name will be used for matching.
*/
public String lookUpDelegateKey(String anEndpointName, String server) {
String key = null;
if (destinationToKeyMap.containsKey(anEndpointName)) {
Set keys = destinationMap.keySet();
Iterator it = keys.iterator();
// Find an endpoint for the GetMeta reply. To succeed, match the endpoint (queue) name
// as well as the server URI. We allow endpoints managed by different servers to have
// the same queue name.
// iterate over all endpoints until a match [queue,server] is found.
while (it.hasNext()) {
key = (String) it.next();
Endpoint_impl endp = (Endpoint_impl) destinationMap.get(key);
// Check if a queue name matches
if (endp != null && endp.getEndpoint().equalsIgnoreCase(anEndpointName)) {
// Check if server match is requested as well
if (server != null) {
// server URIs must match
if (endp.getServerURI() != null && endp.getServerURI().equalsIgnoreCase(server)) {
// found a match for [queue,server]
break;
}
// Not found yet. Reset the key
key = null;
continue;
}
// found a match for [queue]
break;
}
// Not found yet. Reset the key
key = null;
}
}
return key;
}
public Endpoint lookUpEndpoint(String anAnalysisEngineKey, boolean clone)
throws AsynchAEException {
Endpoint endpoint = (Endpoint) destinationMap.get(anAnalysisEngineKey);
if (endpoint != null && clone) {
return (Endpoint) ((Endpoint_impl) endpoint).clone(); // (Endpoint)
}
return endpoint;
}
public PrimitiveServiceInfo getDelegateServiceInfo(String aDelegateKey) {
if (delegateStatMap == null || aDelegateKey == null || delegateStatMap.containsKey(aDelegateKey) == false) {
return null;
}
Object[] delegateStats;
delegateStats = (Object[]) delegateStatMap.get(aDelegateKey);
if (delegateStats != null) {
return (PrimitiveServiceInfo) delegateStats[SERVICE_INFO_INDX];
}
return null;
}
public ServicePerformance getDelegateServicePerformance(String aDelegateKey) {
Object[] delegateStats;
delegateStats = (Object[]) delegateStatMap.get(aDelegateKey);
if (delegateStats != null) {
return (ServicePerformance) delegateStats[SERVICE_PERF_INDX];
}
return null;
}
public ServiceErrors getDelegateServiceErrors(String aDelegateKey) {
Object[] delegateStats;
delegateStats = (Object[]) delegateStatMap.get(aDelegateKey);
if (delegateStats != null) {
return (ServiceErrors) delegateStats[SERVICE_ERROR_INDX];
}
return null;
}
public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException {
mergeTypeSystem(aTypeSystem, fromDestination, null);
}
public synchronized void mergeTypeSystem(String aTypeSystem, String fromDestination,
String fromServer) throws AsynchAEException {
try {
// Find the endpoint for this service, given its input queue name and broker URI.
// We now allow endpoints managed by different servers to have the same queue name.
// But if the external name of the broker is unknown (i.e. an old 2.2.2 service)
// then use just the queue name, i.e. queue names must be unique for 2.2.2
Endpoint_impl endpoint = null;
String key = lookUpDelegateKey(fromDestination, fromServer);
if (key != null) {
endpoint = (Endpoint_impl) destinationMap.get(key);
}
if (endpoint == null) {
// Log invalid reply and move on
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_metadata_recvd_from_invalid_delegate__INFO",
new Object[] { getName(), fromDestination });
}
} else if (endpoint.isWaitingForResponse()) {
endpoint.setWaitingForResponse(false);
endpoint.cancelTimer();
boolean collocatedAggregate = false;
if ( endpoint.getServiceInfo() != null ) {
endpoint.getServiceInfo().setState(ServiceState.RUNNING.name());
}
ResourceMetaData resource = null;
ServiceInfo remoteDelegateServiceInfo = null;
if (aTypeSystem.trim().length() > 0) {
if (endpoint.isRemote()) {
System.out.println("Remote Service:" + key
+ " Initialized. Ready To Process Messages From Queue:"
+ endpoint.getEndpoint());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
"mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_remote_delegate_ready__CONFIG",
new Object[] { getComponentName(), fromDestination });
}
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
"mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_merge_ts_from_delegate__CONFIG", new Object[] { fromDestination });
}
ByteArrayInputStream bis = new ByteArrayInputStream(aTypeSystem.getBytes());
XMLInputSource in1 = new XMLInputSource(bis, null);
resource = UIMAFramework.getXMLParser().parseResourceMetaData(in1);
if (isStopped()) {
return;
}
getCasManagerWrapper().addMetadata((ProcessingResourceMetaData) resource);
analysisEngineMetaDataMap.put(key, (ProcessingResourceMetaData) resource);
if (((ProcessingResourceMetaData) resource).getOperationalProperties()
.getOutputsNewCASes()) {
endpoint.setIsCasMultiplier(true);
remoteCasMultiplierList.add(key);
}
if (endpoint.isRemote()) {
Object o = null;
remoteDelegateServiceInfo = getDelegateServiceInfo(key);
if (remoteDelegateServiceInfo != null
&& (o = ((ProcessingResourceMetaData) resource)
.getConfigurationParameterSettings().getParameterValue(
AnalysisEngineController.AEInstanceCount)) != null) {
((PrimitiveServiceInfo) remoteDelegateServiceInfo)
.setAnalysisEngineInstanceCount(((Integer) o).intValue());
}
}
} else {
collocatedAggregate = true;
}
endpoint.setInitialized(true);
// If getMeta request not yet sent, send meta request to all remote delegate
// Special case when all delegates are remote is handled in the setInputChannel
synchronized (unregisteredDelegateList) {
// TODO can't find where this list is checked. Is it still used???
if (requestForMetaSentToRemotes == false && !allDelegatesAreRemote) {
String unregisteredDelegateKey = null;
for (int i = 0; i < unregisteredDelegateList.size(); i++) {
unregisteredDelegateKey = (String) unregisteredDelegateList.get(i);
if (unregisteredDelegateKey.equals(key)) {
unregisteredDelegateList.remove(i);
}
}
}
}
//
if (collocatedAggregate || resource instanceof ProcessingResourceMetaData) {
if (allTypeSystemsMerged()) {
for (int i = 0; i < remoteCasMultiplierList.size(); i++) {
Endpoint endpt = (Endpoint) destinationMap.get((String) remoteCasMultiplierList
.get(i));
if (endpt != null && endpt.isCasMultiplier() && endpt.isRemote()) {
System.out.println("Setting Shadow Pool of Size:" + endpt.getShadowPoolSize()
+ " For Cas Multiplier:" + (String) remoteCasMultiplierList.get(i));
getCasManagerWrapper().initialize(endpt.getShadowPoolSize(),
(String) remoteCasMultiplierList.get(i));
if (remoteDelegateServiceInfo != null) {
remoteDelegateServiceInfo.setCASMultiplier();
}
}
}
if (!isStopped()) {
try {
completeInitialization();
} catch (ResourceInitializationException ex) {
handleInitializationError(ex);
return;
}
}
}
}
}
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
private synchronized void completeInitialization() throws Exception {
if (initialized) {
return;
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(),
"completeInitialization", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_all_ts_merged__CONFIG");
}
if (errorHandlerChain == null) {
plugInDefaultErrorHandlerChain();
}
AnalysisEngineDescription specifier = (AnalysisEngineDescription) super.getResourceSpecifier();
aggregateMetadata = specifier.getAnalysisEngineMetaData();
flowControllerContainer = UimaClassFactory.produceAggregateFlowControllerContainer(specifier,
flowControllerDescriptor, analysisEngineMetaDataMap, getUimaContextAdmin(),
((AnalysisEngineDescription) getResourceSpecifier()).getSofaMappings(), super
.getManagementInterface());
if (isTopLevelComponent()) {
// Add FC's meta
getCasManagerWrapper().addMetadata((ProcessingResourceMetaData)flowControllerContainer.getMetaData());
// Top level component is the outer most component in the containment hierarchy.
getCasManagerWrapper().initialize("AggregateContext");
aggregateMetadata.setTypeSystem(getCasManagerWrapper().getMetadata().getTypeSystem());
aggregateMetadata.setTypePriorities(getCasManagerWrapper().getMetadata().getTypePriorities());
aggregateMetadata.setFsIndexCollection(getCasManagerWrapper().getMetadata()
.getFsIndexCollection());
}
if (disabledDelegateList.size() > 0) {
flowControllerContainer.removeAnalysisEngines(disabledDelegateList);
}
// start a cleanup thread
((BaseAnalysisEngineController) this).startServiceCleanupThread(30000); // sleep for 30 secs
// Before processing CASes, send notifications to all collocated delegates to
// complete initialization. Currently this call forces all collocated Cas Multiplier delegates
// to initialize their internal Cas Pools. CM Cas Pool is lazily initialized on
// the first process call. The JMX Monitor needs all the Cas Pools to initialize
// before the process call.
onInitialize();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"completeInitialization", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_initialized_controller__INFO", new Object[] { getComponentName() });
}
// Open latch to allow messages to be processed. The
// latch was closed to prevent messages from entering
// the controller before it is initialized.
latch.openLatch(getName(), isTopLevelComponent(), true);
initialized = true;
// Notify client listener that the initialization of the controller was successfull
notifyListenersWithInitializationStatus(null);
// If this is a collocated aggregate change its state to RUNNING from INITIALIZING.
// The top level aggregate state is changed when listeners on its input queue are
// succesfully started in SpringContainerDeployer.doStartListeners() method.
if ( !isTopLevelComponent() ) {
changeState(ServiceState.RUNNING);
}
}
private String findKeyForValue(String fromDestination) {
Set set = destinationMap.entrySet();
for (Iterator it = set.iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
Endpoint endpoint = (Endpoint) entry.getValue();
if (endpoint != null) {
String value = endpoint.getEndpoint();
if (value.equals(fromDestination)) {
return (String) entry.getKey();
}
}
}
return null;
}
private boolean allTypeSystemsMerged() {
if (typeSystemsMerged) {
return true;
}
Set set = destinationMap.entrySet();
int delegateCount = 0;
for (Iterator it = set.iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
Endpoint endpoint = (Endpoint) entry.getValue();
if (endpoint != null && endpoint.getStatus() != Endpoint.DISABLED
&& endpoint.isInitialized() == false) {
break; // At least one delegate has not replied to GetMeta Request
}
delegateCount++;
}
if (delegateCount == destinationMap.size()) {
return true; // All delegates responded to GetMeta request
}
return false;
}
public void initialize() throws AsynchAEException {
Statistic statistic = null;
if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessCount)) == null) {
statistic = new LongNumericStatistic(Monitor.ProcessCount);
getMonitor().addStatistic("", statistic);
}
if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessErrorCount)) == null) {
statistic = new LongNumericStatistic(Monitor.ProcessErrorCount);
getMonitor().addStatistic("", statistic);
}
if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessErrorRetryCount)) == null) {
statistic = new LongNumericStatistic(Monitor.ProcessErrorRetryCount);
getMonitor().addStatistic("", statistic);
}
}
public void dispatchMetadataRequest(Endpoint anEndpoint) throws AsynchAEException {
if (isStopped()) {
return;
}
if (anEndpoint == null) {
throw new AsynchAEException("Controller:" + getComponentName()
+ " Unable To Dispatch GetMeta Request. Provided Endpoint is Invalid (NULL)");
}
anEndpoint.startMetadataRequestTimer();
anEndpoint.setController(this);
anEndpoint.setWaitingForResponse(true);
String key = lookUpDelegateKey(anEndpoint.getEndpoint());
if (key != null && !delegateStatMap.containsKey(key)) {
if (key != null) {
ServiceInfo serviceInfo = anEndpoint.getServiceInfo();
PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo(serviceInfo.isCASMultiplier(),null);
pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL());
pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
pServiceInfo.setState(serviceInfo.getState());
pServiceInfo.setAnalysisEngineInstanceCount(1);
ServicePerformance servicePerformance = new ServicePerformance();
if (anEndpoint.isRemote()) {
servicePerformance.setRemoteDelegate();
}
ServiceErrors serviceErrors = new ServiceErrors();
serviceErrorMap.put(key, serviceErrors);
Object[] delegateStatsArray = new Object[] { pServiceInfo, servicePerformance,
serviceErrors };
delegateStatMap.put(key, delegateStatsArray);
}
}
getOutputChannel().sendRequest(AsynchAEMessage.GetMeta, anEndpoint);
}
public void retryMetadataRequest(Endpoint anEndpoint) throws AsynchAEException {
dispatchMetadataRequest(anEndpoint);
}
public void sendMetadata(Endpoint anEndpoint) {
super.sendMetadata(anEndpoint, aggregateMetadata);
}
public ControllerLatch getControllerLatch() {
return latch;
}
public Monitor getMonitor() {
return super.monitor;
}
public void setMonitor(Monitor monitor) {
this.monitor = monitor;
}
public void handleDelegateLifeCycleEvent(String anEndpoint, int aDelegateCount) {
Endpoint endpoint = null;
if (destinationMap.containsKey(anEndpoint)) {
endpoint = (Endpoint) destinationMap.get(anEndpoint);
}
if (aDelegateCount == 0) {
// Set the state of the delegate endpoint
endpoint.setNoConsumers(true);
// Get entries from the in-process cache that match the endpoint name
CacheEntry[] cachedEntries = getInProcessCache().getCacheEntriesForEndpoint(anEndpoint);
for (int i = 0; cachedEntries != null && i < cachedEntries.length; i++) {
String casReferenceId = cachedEntries[i].getCasReferenceId();
String parentCasReferenceId = null;
try {
CasStateEntry parentEntry = getLocalCache().getTopCasAncestor(casReferenceId);// cachedEntries[i].getInputCasReferenceId();
if (parentEntry != null) {
parentCasReferenceId = parentEntry.getCasReferenceId();
}
} catch (Exception e) {
System.out.println("Controller:" + getComponentName() + " Parent CAS For CAS:"
+ casReferenceId + " Not Found In Cache");
}
getInProcessCache().getEndpoint(anEndpoint, casReferenceId).cancelTimer();
Endpoint requestOrigin = cachedEntries[i].getMessageOrigin();
try {
getOutputChannel().sendReply(
new UimaEEServiceException("Delegates Not Found To Process CAS on Endpoint:"
+ anEndpoint), casReferenceId, parentCasReferenceId, requestOrigin,
AsynchAEMessage.Process);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleDelegateLifeCycleEvent", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_no_consumers__INFO", new Object[] { casReferenceId, anEndpoint });
}
} finally {
getInProcessCache().remove(casReferenceId);
}
}
} else if (endpoint != null && endpoint.hasNoConsumers()) {
// At least one consumer is available
endpoint.setNoConsumers(false);
}
}
public void retryLastCommand(int aCommand, Endpoint anEndpoint, String aCasReferenceId) {
try {
if (AsynchAEMessage.Process == aCommand) {
getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
} else {
getOutputChannel().sendRequest(aCommand, anEndpoint);
}
} catch (AsynchAEException e) {
}
}
public ServiceErrors getServiceErrors(String aDelegateKey) {
if (!serviceErrorMap.containsKey(aDelegateKey)) {
return null;
}
return (ServiceErrors) serviceErrorMap.get(aDelegateKey);
}
public AggregateServiceInfo getServiceInfo() {
if (serviceInfo == null) {
serviceInfo = new AggregateServiceInfo(isCasMultiplier(), this);
// if this is a top level service and the input channel not yet initialized
// block in getInputChannel() on the latch
if (isTopLevelComponent() && getInputChannel() != null) {
serviceInfo.setInputQueueName(getInputChannel().getName());
serviceInfo.setBrokerURL(super.getBrokerURL());
} else {
serviceInfo.setInputQueueName(getName());
serviceInfo.setBrokerURL("vm://localhost");
}
serviceInfo.setDeploymentDescriptorPath(super.aeDescriptor);
//serviceInfo.setState(super.getState().name());
}
return serviceInfo;
}
public ServicePerformance getServicePerformance(String aDelegateKey) {
return getDelegateServicePerformance(aDelegateKey);
}
/**
* Accumulate analysis time for the aggregate
*
* @param anAnalysisTime
*/
public synchronized void incrementAnalysisTime(long anAnalysisTime) {
servicePerformance.incrementAnalysisTime(anAnalysisTime);
}
public void stopTimers() {
Set set = destinationMap.entrySet();
for (Iterator it = set.iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
Endpoint endpoint = (Endpoint) entry.getValue();
if (endpoint != null) {
endpoint.cancelTimer();
}
}
}
public boolean requestForMetaSentToRemotes() {
return requestForMetaSentToRemotes;
}
public void setRequestForMetaSentToRemotes() {
requestForMetaSentToRemotes = true;
}
public void cleanUp() {
if (flowMap != null) {
flowMap.clear();
}
if (destinationMap != null) {
destinationMap.clear();
}
if (destinationToKeyMap != null) {
destinationToKeyMap.clear();
}
if (analysisEngineMetaDataMap != null) {
analysisEngineMetaDataMap.clear();
}
if (remoteCasMultiplierList != null) {
remoteCasMultiplierList.clear();
}
if (originMap != null) {
originMap.clear();
}
if (childControllerList != null) {
synchronized( childControllerList ) {
childControllerList.clear();
}
}
if (delegateStats != null) {
delegateStats.clear();
}
if (flowControllerContainer != null) {
synchronized (flowControllerContainer) {
flowControllerContainer.destroy();
}
}
perCasStatistics.clear();
if (disabledDelegateList != null) {
disabledDelegateList.clear();
}
if (delegateStatMap != null) {
delegateStatMap.clear();
}
}
public void stop() {
super.stop();
this.cleanUp();
}
public List getChildControllerList() {
return childControllerList;
}
/**
* Force all collocated delegates to perform any post-initialization steps.
*/
public void onInitialize() {
// For each collocated delegate
synchronized(childControllerList) {
if ( childControllerList.size() > 0 ) {
for( AnalysisEngineController childController : childControllerList ) {
// notify the delegate
childController.onInitialize();
}
}
}
}
public LocalCache getLocalCache() {
return localCache;
}
/**
* Return {@link Delegate} object for a given delegate key.
*
*/
public Delegate lookupDelegate(String aDelegateKey) {
for (Delegate delegate : delegates) {
if (delegate.getKey().equals(aDelegateKey)) {
return delegate;
}
}
return null;
}
}