| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.uima.aae.controller; |
| |
| import java.io.ByteArrayInputStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Semaphore; |
| |
| import org.apache.uima.UIMAFramework; |
| import org.apache.uima.aae.AsynchAECasManager; |
| import org.apache.uima.aae.InProcessCache; |
| import org.apache.uima.aae.InputChannel; |
| import org.apache.uima.aae.UIMAEE_Constants; |
| import org.apache.uima.aae.UimaClassFactory; |
| import org.apache.uima.aae.InProcessCache.CacheEntry; |
| import org.apache.uima.aae.controller.LocalCache.CasStateEntry; |
| import org.apache.uima.aae.delegate.ControllerDelegate; |
| import org.apache.uima.aae.delegate.Delegate; |
| import org.apache.uima.aae.error.AsynchAEException; |
| import org.apache.uima.aae.error.ErrorContext; |
| import org.apache.uima.aae.error.ErrorHandler; |
| import org.apache.uima.aae.error.ServiceShutdownException; |
| import org.apache.uima.aae.error.UimaEEServiceException; |
| import org.apache.uima.aae.error.UnknownDestinationException; |
| import org.apache.uima.aae.jmx.AggregateServiceInfo; |
| import org.apache.uima.aae.jmx.JmxManagement; |
| import org.apache.uima.aae.jmx.PrimitiveServiceInfo; |
| import org.apache.uima.aae.jmx.ServiceErrors; |
| import org.apache.uima.aae.jmx.ServiceInfo; |
| import org.apache.uima.aae.jmx.ServicePerformance; |
| import org.apache.uima.aae.message.AsynchAEMessage; |
| import org.apache.uima.aae.monitor.Monitor; |
| import org.apache.uima.aae.monitor.statistics.LongNumericStatistic; |
| import org.apache.uima.aae.monitor.statistics.Statistic; |
| import org.apache.uima.aae.spi.transport.UimaMessage; |
| import org.apache.uima.aae.spi.transport.UimaTransport; |
| import org.apache.uima.analysis_engine.AnalysisEngineDescription; |
| import org.apache.uima.analysis_engine.AnalysisEngineProcessException; |
| import org.apache.uima.analysis_engine.asb.impl.FlowContainer; |
| import org.apache.uima.analysis_engine.asb.impl.FlowControllerContainer; |
| import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData; |
| import org.apache.uima.cas.CAS; |
| import org.apache.uima.flow.FinalStep; |
| import org.apache.uima.flow.ParallelStep; |
| import org.apache.uima.flow.SimpleStep; |
| import org.apache.uima.flow.Step; |
| import org.apache.uima.resource.ResourceInitializationException; |
| import org.apache.uima.resource.metadata.ProcessingResourceMetaData; |
| import org.apache.uima.resource.metadata.ResourceMetaData; |
| import org.apache.uima.util.Level; |
| import org.apache.uima.util.XMLInputSource; |
| |
| public class AggregateAnalysisEngineController_impl extends BaseAnalysisEngineController implements |
| AggregateAnalysisEngineController, AggregateAnalysisEngineController_implMBean { |
| |
| /** |
| * |
| */ |
| private static final long serialVersionUID = 1L; |
| |
| private static final Class CLASS_NAME = AggregateAnalysisEngineController_impl.class; |
| |
| private static final int SERVICE_INFO_INDX = 0; |
| |
| private static final int SERVICE_PERF_INDX = 1; |
| |
| private static final int SERVICE_ERROR_INDX = 2; |
| |
| private ConcurrentHashMap flowMap = new ConcurrentHashMap(); |
| |
| private ConcurrentHashMap destinationMap; |
| |
| private Map destinationToKeyMap; |
| |
| private volatile boolean typeSystemsMerged = false; |
| |
| private AnalysisEngineMetaData aggregateMetadata; |
| |
| private HashMap analysisEngineMetaDataMap = new HashMap(); |
| |
| private List disabledDelegateList = new ArrayList(); |
| |
| private List remoteCasMultiplierList = new ArrayList(); |
| |
| private String descriptor; |
| |
| private transient FlowControllerContainer flowControllerContainer; |
| |
| private String flowControllerDescriptor; |
| |
| private ConcurrentHashMap originMap = new ConcurrentHashMap(); |
| |
| private String controllerBeanName = null; |
| |
| private String serviceEndpointName = null; |
| |
| protected volatile boolean initialized = false; |
| |
| protected List<AnalysisEngineController> childControllerList = new ArrayList<AnalysisEngineController>(); |
| |
| private ConcurrentHashMap delegateStats = new ConcurrentHashMap(); |
| |
| private AggregateServiceInfo serviceInfo = null; |
| |
| private int remoteIndex = 1; |
| |
| private volatile boolean requestForMetaSentToRemotes = false; |
| |
| private ConcurrentHashMap<String, Object[]> delegateStatMap = new ConcurrentHashMap<String, Object[]>(); |
| |
| public final Object parallelStepMux = new Object(); |
| |
| // prevents more than one thread to call collectionProcessComplete on the FC |
| private volatile boolean doSendCpcReply = false; |
| // Guards FC's next() method. Single thread access is allowed at any given time |
| private Semaphore flowSemaphore = new Semaphore(1); |
| |
| /** |
| * |
| * @param anEndpointName |
| * @param aDescriptor |
| * @param aCasManager |
| * @param anInProcessCache |
| * @param aDestinationMap |
| * @throws Exception |
| */ |
| public AggregateAnalysisEngineController_impl(String anEndpointName, String aDescriptor, |
| AsynchAECasManager aCasManager, InProcessCache anInProcessCache, Map aDestinationMap) |
| throws Exception { |
| this(null, anEndpointName, aDescriptor, aCasManager, anInProcessCache, aDestinationMap); |
| } |
| |
| /** |
| * |
| * |
| * @param aParentController |
| * @param anEndpointName |
| * @param aDescriptor |
| * @param aCasManager |
| * @param anInProcessCache |
| * @param aDestinationMap |
| * @throws Exception |
| */ |
| public AggregateAnalysisEngineController_impl(AnalysisEngineController aParentController, |
| String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, |
| InProcessCache anInProcessCache, Map aDestinationMap) throws Exception { |
| this(aParentController, anEndpointName, aDescriptor, aCasManager, anInProcessCache, |
| aDestinationMap, null); |
| } |
| |
| public AggregateAnalysisEngineController_impl(AnalysisEngineController aParentController, |
| String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, |
| InProcessCache anInProcessCache, Map aDestinationMap, JmxManagement aJmxManagement) |
| throws Exception { |
| super(aParentController, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache, |
| aDestinationMap, aJmxManagement); |
| this.initialize(); |
| } |
| |
| public void registerChildController(AnalysisEngineController aChildController, String aDelegateKey) |
| throws Exception { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "registerChildController", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_register_controller__FINE", |
| new Object[] { getComponentName(), aChildController.getComponentName() }); |
| } |
| synchronized(childControllerList) { |
| childControllerList.add(aChildController); |
| } |
| } |
| |
| public void saveStatsFromService(String aServiceEndpointName, Map aServiceStats) { |
| String delegateKey = lookUpDelegateKey(aServiceEndpointName); |
| delegateStats.put(delegateKey, aServiceStats); |
| } |
| |
| /** |
| * |
| */ |
| public void addMessageOrigin(String aCasReferenceId, Endpoint anEndpoint) { |
| if (anEndpoint == null) { |
| System.out.println("Controller:" + getComponentName() |
| + " Endpoint is NULL. Cas Reference Id:" + aCasReferenceId); |
| } |
| originMap.put(aCasReferenceId, anEndpoint); |
| if (UIMAFramework.getLogger().isLoggable(Level.FINE)) { |
| Iterator it = originMap.keySet().iterator(); |
| StringBuffer sb = new StringBuffer(); |
| while (it.hasNext()) { |
| String key = (String) it.next(); |
| Endpoint e = (Endpoint) originMap.get(key); |
| if (e != null) { |
| sb.append("\t\nCAS:" + key + " Origin:" + e.getEndpoint()); |
| } |
| } |
| /* |
| * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| * "addMessageOrigin", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| * "UIMAEE_dump_msg_origin__FINE", new Object[] {getComponentName(), sb.toString()}); |
| */ |
| } |
| } |
| |
| public boolean isDelegateDisabled(String aDelegateKey) { |
| if (aDelegateKey == null) { |
| return false; |
| } |
| Iterator it = disabledDelegateList.iterator(); |
| while (it.hasNext()) { |
| if (aDelegateKey.equalsIgnoreCase((String) it.next())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * |
| * @param anEndpointName |
| */ |
| public void setServiceEndpointName(String anEndpointName) { |
| serviceEndpointName = anEndpointName; |
| if (this.isTopLevelComponent()) { |
| // This is done so that the collocated client application can determine where to send messages |
| System.setProperty("ServiceInputQueueName", serviceEndpointName); |
| } |
| } |
| |
| /** |
| * |
| */ |
| public String getServiceEndpointName() { |
| return serviceEndpointName; |
| } |
| |
| /** |
| * |
| * @param aBeanName |
| */ |
| public void setControllerBeanName(String aBeanName) { |
| controllerBeanName = aBeanName; |
| if (this.isTopLevelComponent()) { |
| System.setProperty("Controller", controllerBeanName); |
| } |
| } |
| |
| /** |
| * |
| */ |
| public Endpoint getMessageOrigin(String aCasReferenceId) { |
| if (originMap.containsKey(aCasReferenceId)) { |
| return (Endpoint) originMap.get(aCasReferenceId); |
| } |
| return null; |
| } |
| |
| public void removeMessageOrigin(String aCasReferenceId) { |
| if (originMap.containsKey(aCasReferenceId)) { |
| originMap.remove(aCasReferenceId); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "removeMessageOrigin", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_remove_msg_origin_entry__FINEST", |
| new Object[] { getComponentName(), aCasReferenceId }); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| public void dropCAS(String aCasReferenceId, boolean dropCacheEntry) { |
| |
| FlowContainer flow = lookupFlow(aCasReferenceId); |
| if (flow != null) { |
| flowMap.remove(aCasReferenceId); |
| } |
| super.dropCAS(aCasReferenceId, dropCacheEntry); |
| } |
| |
| public void dropFlow(String aCasReferenceId, boolean abortFlow) { |
| FlowContainer flow = lookupFlow(aCasReferenceId); |
| if (flow != null) { |
| if (abortFlow) { |
| synchronized (flowControllerContainer) { |
| flow.aborted(); |
| } |
| } |
| |
| flowMap.remove(aCasReferenceId); |
| } |
| |
| } |
| |
| /** |
| * |
| */ |
| public void mapEndpointsToKeys(ConcurrentHashMap aDestinationMap) { |
| destinationMap = aDestinationMap; |
| Set set = destinationMap.entrySet(); |
| for (Iterator it = set.iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| Endpoint endpoint = (Endpoint) entry.getValue(); |
| if (endpoint != null) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "mapEndpointsToKeys", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_endpoint_to_key_map__FINE", |
| new Object[] { getName(), (String) entry.getKey(), endpoint.getEndpoint() }); |
| } |
| if (destinationToKeyMap == null) { |
| destinationToKeyMap = new HashMap(); |
| } |
| // Create and initialize a Delegate object for the endpoint |
| Delegate delegate = new ControllerDelegate((String) entry.getKey(), this); |
| delegate.setCasProcessTimeout(endpoint.getProcessRequestTimeout()); |
| delegate.setGetMetaTimeout(endpoint.getMetadataRequestTimeout()); |
| delegate.setEndpoint(endpoint); |
| // Add new delegate to the global Delegate list |
| delegates.add(delegate); |
| endpoint.setDelegateKey((String) entry.getKey()); |
| destinationToKeyMap.put(endpoint.getEndpoint(), (String) entry.getKey()); |
| } |
| } |
| |
| } |
| |
| /** |
| * Change the CPC status for each delegate in the destination Map. |
| */ |
| private void resetEndpointsCpcStatus() { |
| Set set = destinationMap.entrySet(); |
| for (Iterator it = set.iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| Endpoint endpoint = (Endpoint) entry.getValue(); |
| if (endpoint != null && endpoint.getStatus() == Endpoint.OK) { |
| endpoint.setCompletedProcessingCollection(false); |
| } |
| } |
| } |
| |
| /** |
| * |
| * @return |
| */ |
| private synchronized boolean allDelegatesCompletedCollection() { |
| Set set = destinationMap.entrySet(); |
| for (Iterator it = set.iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| Endpoint endpoint = (Endpoint) entry.getValue(); |
| if (endpoint != null && endpoint.getStatus() == Endpoint.OK |
| && endpoint.completedProcessingCollection() == false) { |
| return false; |
| } |
| } |
| // All delegates replied to CPC, change the status of each delegate |
| // to handle next CPC request. |
| resetEndpointsCpcStatus(); |
| return true; |
| } |
| |
| public Map getDelegateStats() { |
| return delegateStats; |
| } |
| |
| /** |
| * |
| */ |
| public void processCollectionCompleteReplyFromDelegate(String aDelegateKey, boolean sendReply) |
| throws AsynchAEException { |
| |
| try { |
| Endpoint endpoint = (Endpoint) destinationMap.get(aDelegateKey); |
| String key = lookUpDelegateKey(aDelegateKey); |
| if (endpoint == null) { |
| endpoint = (Endpoint) destinationMap.get(key); |
| if (endpoint == null) { |
| throw new AsynchAEException("Unable to find Endpoint Object Using:" + aDelegateKey); |
| } |
| } |
| endpoint.cancelTimer(); |
| endpoint.setCompletedProcessingCollection(true); |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "processCollectionCompleteReplyFromDelegate", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_recvd_cpc_reply__FINE", |
| new Object[] { key }); |
| } |
| Endpoint cEndpoint = null; |
| // synchronized to prevent more than one thread to call collectionProcessComplete() on |
| // the Flow Controller. |
| if (flowControllerContainer != null) { |
| synchronized (flowControllerContainer) { |
| if (doSendCpcReply == false && sendReply && allDelegatesCompletedCollection() |
| && ((cEndpoint = getClientEndpoint()) != null)) { |
| doSendCpcReply = true; |
| flowControllerContainer.collectionProcessComplete(); |
| } |
| } |
| } |
| // Reply to a client once for each CPC request. doSendCpcReply is volatile thus |
| // no need to synchronize it |
| if (doSendCpcReply) { |
| sendCpcReply(cEndpoint); |
| doSendCpcReply = false; // reset for the next CPC |
| } |
| } catch (Exception e) { |
| throw new AsynchAEException(e); |
| } |
| } |
| |
| private void sendCpcReply(Endpoint aClientEndpoint) throws Exception { |
| Iterator destIterator = destinationMap.keySet().iterator(); |
| while (destIterator.hasNext()) { |
| |
| String key = (String) destIterator.next(); |
| Endpoint endpoint = (Endpoint) destinationMap.get(key); |
| if (endpoint != null) { |
| endpoint.setCompletedProcessingCollection(false); // reset for the next run |
| } |
| logStats(key, getDelegateServicePerformance(key)); |
| |
| } |
| // Log this controller's stats |
| logStats(getComponentName(), servicePerformance); |
| |
| endProcess(AsynchAEMessage.Process); |
| if (aClientEndpoint == null) { |
| aClientEndpoint = getClientEndpoint(); |
| } |
| if (!aClientEndpoint.isRemote()) { |
| UimaTransport transport = getTransport(aClientEndpoint.getEndpoint()); |
| UimaMessage message = transport.produceMessage(AsynchAEMessage.CollectionProcessComplete, |
| AsynchAEMessage.Response, getName()); |
| // Send reply back to the client. Use internal (non-jms) transport |
| transport.getUimaMessageDispatcher(aClientEndpoint.getEndpoint()).dispatch(message); |
| } else { |
| getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint); |
| } |
| |
| clearStats(); |
| } |
| |
| /** |
| * |
| * @param aFlowControllerDescriptor |
| */ |
| public synchronized void setFlowControllerDescriptor(String aFlowControllerDescriptor) { |
| flowControllerDescriptor = aFlowControllerDescriptor; |
| } |
| |
| /** |
| * |
| * @param anEndpoint |
| * @throws AsynchAEException |
| */ |
| private void waitUntilAllCasesAreProcessed(Endpoint anEndpoint) throws AsynchAEException { |
| try { |
| boolean cacheNotEmpty = true; |
| boolean shownOnce = false; |
| final Object localMux = new Object(); |
| while (cacheNotEmpty) { |
| InProcessCache cache = getInProcessCache(); |
| if (!shownOnce) { |
| shownOnce = true; |
| cache.dumpContents(getComponentName()); |
| } |
| |
| if (cache.isEmpty()) { |
| cacheNotEmpty = false; |
| } else { |
| synchronized (localMux) { |
| localMux.wait(10); |
| } |
| } |
| } |
| } catch (Exception e) { |
| throw new AsynchAEException(e); |
| } |
| } |
| |
| /** |
| * |
| */ |
| public void takeAction(String anAction, String anEndpointName, ErrorContext anErrorContext) { |
| try { |
| handleAction(anAction, anEndpointName, anErrorContext); |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "takeAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| } |
| } |
| |
| public void collectionProcessComplete(Endpoint anEndpoint) throws AsynchAEException { |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_cpc__FINEST", new Object[] { getName() }); |
| } |
| getInProcessCache().dumpContents(getComponentName()); |
| localCache.dumpContents(); |
| |
| cacheClientEndpoint(anEndpoint); |
| |
| // Wait until the entire cache is empty. The cache stores in-process CASes. |
| // When a CAS is processed completly it is removed from the cache. |
| waitUntilAllCasesAreProcessed(anEndpoint); |
| |
| anEndpoint.setCommand(AsynchAEMessage.CollectionProcessComplete); |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_cpc_all_cases_processed__FINEST", new Object[] { getName() }); |
| } |
| // Special case. Check if ALL delegates have been disabled. If so, destinationMap |
| // will be empty. |
| if (destinationMap.size() == 0) { |
| try { |
| sendCpcReply(null); |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| } |
| } else { |
| Set set = destinationMap.entrySet(); |
| for (Iterator it = set.iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| Endpoint endpoint = (Endpoint) entry.getValue(); |
| if (endpoint != null && endpoint.getStatus() == Endpoint.OK) { |
| |
| if (!endpoint.isRemote()) { |
| try { |
| UimaTransport transport = getTransport(endpoint.getEndpoint()); |
| UimaMessage message = transport |
| .produceMessage(AsynchAEMessage.CollectionProcessComplete, |
| AsynchAEMessage.Request, getName()); |
| // Send reply back to the client. Use internal (non-jms) transport |
| transport.getUimaMessageDispatcher(endpoint.getEndpoint()).dispatch(message); |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| } |
| } else { |
| getOutputChannel().sendRequest(AsynchAEMessage.CollectionProcessComplete, endpoint); |
| endpoint.startCollectionProcessCompleteTimer(); |
| } |
| } |
| } |
| } |
| } |
| |
| public String getDescriptor() { |
| return descriptor; |
| } |
| |
| public void setDescriptor(String descriptor) { |
| this.descriptor = descriptor; |
| } |
| |
| public boolean isPrimitive() { |
| return false; |
| } |
| |
| public Map getDestinations() { |
| return destinationMap; |
| } |
| |
| public void enableDelegates(List aDelegateList) throws AsynchAEException { |
| try { |
| // flowControllerContainer.addAnalysisEngines(aDelegateList); |
| } catch (Exception e) { |
| throw new AsynchAEException(e); |
| } |
| |
| } |
| |
| public void handleInitializationError(Exception ex) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "handleInitializationError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { ex }); |
| } |
| // Any problems in completeInitialization() is a reason to stop |
| notifyListenersWithInitializationStatus(ex); |
| super.stop(); |
| } |
| |
| private void stopListener(String key, Endpoint endpoint) throws Exception { |
| // Stop the Listener on endpoint that has been disabled |
| InputChannel iC = null; |
| String destName = null; |
| if (endpoint.getDestination() != null) { |
| System.out.println("Controller:" + getComponentName() |
| + "-Stopping Listener Thread on Endpoint:" + endpoint.getDestination()); |
| destName = endpoint.getDestination().toString(); |
| iC = getInputChannel(destName); |
| } else { |
| System.out.println("Controller:" + getComponentName() |
| + "-Stopping Listener Thread on Endpoint:" + endpoint.getReplyToEndpoint()); |
| destName = endpoint.getReplyToEndpoint(); |
| iC = getInputChannel(destName); |
| } |
| if (iC != null) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "stopListener", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stopping_listener__INFO", |
| new Object[] { getComponentName(), destName, key }); |
| } |
| iC.destroyListener(destName, key); |
| } |
| } |
| |
| public void disableDelegates(List aDelegateList) throws AsynchAEException { |
| disableDelegates(aDelegateList, null); |
| } |
| |
| protected void disableDelegates(List aDelegateList, String aCasReferenceId) |
| throws AsynchAEException { |
| if (aDelegateList == null) { |
| throw new AsynchAEException("Controller:" + getComponentName() |
| + " Unable To Disable a Delegate. The Delegate List Provided Is Invalid (Null)"); |
| } |
| try { |
| Iterator it = aDelegateList.iterator(); |
| while (it.hasNext()) { |
| String key = (String) it.next(); |
| Endpoint endpoint = lookUpEndpoint(key, false); |
| // if the the current delegate is remote, destroy its listener |
| if (endpoint != null && endpoint.isRemote()) { |
| Delegate delegate = lookupDelegate(key); |
| if (delegate != null) { |
| delegate.cancelDelegateTimer(); |
| } |
| stopListener(key, endpoint); |
| endpoint.setStatus(Endpoint.DISABLED); |
| } |
| System.out.println("Controller:" + getComponentName() + " Disabled Delegate:" + key |
| + " Due to Excessive Errors"); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), |
| "disableDelegates", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_disable_endpoint__INFO", new Object[] { getComponentName(), key }); |
| } |
| // Change state of the delegate |
| ServiceInfo sf = getDelegateServiceInfo(key); |
| if (sf != null) { |
| sf.setState(ServiceState.DISABLED.name()); |
| } |
| synchronized (disabledDelegateList) { |
| disabledDelegateList.add(key); |
| } |
| |
| } |
| if (flowControllerContainer != null) { |
| try { |
| synchronized (flowControllerContainer) { |
| flowControllerContainer.removeAnalysisEngines(aDelegateList); |
| } |
| } catch (Exception ex) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "disableDelegates", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { ex }); |
| } |
| if (aCasReferenceId != null) { |
| CasStateEntry parentCasCacheEntry = getLocalCache().getTopCasAncestor(aCasReferenceId); |
| if (parentCasCacheEntry != null && aDelegateList.size() > 0) { |
| String delegateKey = (String) aDelegateList.get(0); |
| System.out.println("Controller:" + getComponentName() |
| + " Terminating Due to FlowController Failure While Disabling Delegate:" |
| + delegateKey + " Cas:" + parentCasCacheEntry.getCasReferenceId()); |
| super.terminate(ex, parentCasCacheEntry.getCasReferenceId()); |
| } else { |
| terminate(); |
| } |
| } else { |
| terminate(); |
| } |
| return; |
| } |
| } |
| if (!initialized && allTypeSystemsMerged()) { |
| try { |
| completeInitialization(); |
| } catch (ResourceInitializationException ex) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "disableDelegates", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { ex }); |
| } |
| handleInitializationError(ex); |
| return; |
| } |
| } |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "disableDelegates", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| throw new AsynchAEException(e); |
| } |
| |
| } |
| |
| public boolean continueOnError(String aCasReferenceId, String aDelegateKey, Exception anException) |
| throws AsynchAEException { |
| if (aDelegateKey == null || aCasReferenceId == null) { |
| return false; |
| } |
| try { |
| FlowContainer flow = lookupFlow(aCasReferenceId); |
| if (flow == null) { |
| return false; |
| } |
| synchronized (flowControllerContainer) { |
| return flow.continueOnFailure(aDelegateKey, anException); |
| } |
| } catch (Exception e) { |
| throw new AsynchAEException(e); |
| } |
| } |
| |
| private FlowContainer lookupFlow(String aCasReferenceId) { |
| if (flowMap != null) { |
| if (flowMap.containsKey(aCasReferenceId)) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "lookupFlow", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_retrieve_flow_object__FINEST", |
| new Object[] { getComponentName(), aCasReferenceId }); |
| } |
| return (FlowContainer) flowMap.get(aCasReferenceId); |
| } |
| } |
| return null; |
| } |
| |
| public String getLastDelegateKeyFromFlow(String anInputCasReferenceId) { |
| return null; |
| } |
| |
| /** |
| * This routine is called to handle CASes produced by a CAS Multiplier. A new CAS needs a flow |
| * object which is produced here from the Flow associated with the input CAS. Once the subflow is |
| * computed, it is cached for future use. |
| * |
| * @param aCAS |
| * - CAS to process |
| * @param anInputCasReferenceId |
| * - reference id of the input CAS |
| * @param aNewCasReferenceId |
| * - reference id of the CAS created by the CAS multiplier |
| * @param newCASProducedBy |
| * - name of the multiplier that created the CAS |
| * @throws AnalysisEngineProcessException |
| * @throws AsynchAEException |
| */ |
| public void process(CAS aCAS, String anInputCasReferenceId, String aNewCasReferenceId, |
| String newCASProducedBy) // throws AnalysisEngineProcessException, AsynchAEException |
| { |
| FlowContainer flow = null; |
| |
| try { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "process", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_lookup_flow__FINE", |
| new Object[] { getComponentName(), anInputCasReferenceId }); |
| } |
| try { |
| // Lookup a Flow object associated with an input CAS. |
| if (flowMap.containsKey(anInputCasReferenceId)) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_retrieve_flow_object__FINEST", |
| new Object[] { getComponentName(), anInputCasReferenceId }); |
| } |
| // Retrieve an input CAS Flow object from the flow cache. This Flow object will be used |
| // to compute |
| // subordinate Flow for the new CAS. |
| flow = (FlowContainer) flowMap.get(anInputCasReferenceId); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_retrieved_flow_object_ok__FINEST", |
| new Object[] { getComponentName(), anInputCasReferenceId }); |
| } |
| |
| } |
| if (flow != null) { |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINE, |
| CLASS_NAME.getName(), |
| "process", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_lookup_flow_ok__FINE", |
| new Object[] { getComponentName(), aNewCasReferenceId, newCASProducedBy, |
| anInputCasReferenceId, }); |
| } |
| // Compute subordinate Flow from the Flow associated with the |
| // input CAS. |
| synchronized (flowControllerContainer) { |
| flow = flow.newCasProduced(aCAS, newCASProducedBy); |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINE, |
| CLASS_NAME.getName(), |
| "process", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_new_flow_ok__FINE", |
| new Object[] { getComponentName(), aNewCasReferenceId, newCASProducedBy, |
| anInputCasReferenceId, }); |
| } |
| // Check if the local cache already contains an entry for the Cas id. |
| // A colocated Cas Multiplier may have already registered this CAS |
| // in the parent's controller |
| if (localCache.lookupEntry(aNewCasReferenceId) == null) { |
| // Add this Cas Id to the local cache. Every input CAS goes through here |
| CasStateEntry casStateEntry = localCache.createCasStateEntry(aNewCasReferenceId); |
| casStateEntry.setInputCasReferenceId(anInputCasReferenceId); |
| } |
| |
| // Save the subordinate Flow Object in a cache. Flow exists in the |
| // cache until the CAS is fully processed or it is |
| // explicitly deleted when processing of this CAS cannot continue |
| flowMap.put(aNewCasReferenceId, flow); |
| // Register the fact that this is a new CAS and the fact that is was produced |
| // by this aggregate. It is important to register this to determine how to |
| // handle the CAS in delegate Aggregate services. When the CAS is processed |
| // in the Delegate Aggregate, the CAS produced in the parent Aggregate cannot |
| // be dropped in the delegate. Check Final Step logic. |
| getInProcessCache().getCacheEntryForCAS(aNewCasReferenceId).setNewCas(true, |
| getComponentName()); |
| } else { |
| throw new AsynchAEException( |
| "Flow Object Not In Flow Cache. Expected Flow Object in FlowCache for Cas Reference Id:" |
| + anInputCasReferenceId); |
| } |
| |
| } catch (Throwable t) { |
| // Any error here is automatic termination |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", |
| new Object[] { t }); |
| } |
| sendReplyWithShutdownException(anInputCasReferenceId); |
| handleAction(ErrorHandler.TERMINATE, null, null); |
| return; |
| |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINEST, |
| CLASS_NAME.getName(), |
| "process", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_executing_step__FINEST", |
| new Object[] { getComponentName(), aNewCasReferenceId, newCASProducedBy, |
| anInputCasReferenceId, }); |
| } |
| |
| // Continue with Steps. The CAS has been produced by the CAS Multiplier |
| executeFlowStep(flow, aNewCasReferenceId, true); |
| |
| } catch (Exception e) { |
| HashMap map = new HashMap(); |
| map.put(AsynchAEMessage.Command, AsynchAEMessage.Process); |
| handleError(map, e); |
| } |
| |
| } |
| |
| private void sendReplyWithShutdownException(String aCasReferenceId) { |
| try { |
| CasStateEntry casStateEntry = localCache.createCasStateEntry(aCasReferenceId); |
| CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId); |
| Endpoint replyEndpoint = getReplyEndpoint(cacheEntry, casStateEntry); |
| if (replyEndpoint != null) { |
| getOutputChannel().sendReply(new ServiceShutdownException(), aCasReferenceId, null, |
| replyEndpoint, AsynchAEMessage.Process); |
| } |
| } catch (Exception ex) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", |
| new Object[] { ex }); |
| } |
| } |
| } |
| |
| private boolean abortProcessingCas(CasStateEntry casStateEntry, CacheEntry entry) { |
| CasStateEntry parentCasStateEntry = null; |
| try { |
| // Check if this CAS has a parent |
| if (casStateEntry.isSubordinate()) { |
| // Fetch parent's cache entry |
| parentCasStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId()); |
| // Check the state of the parent CAS. If it is marked as failed, it means that |
| // one of its child CASes failed and error handling was configured to fail the |
| // CAS. Such failure of a child CAS causes a failure of the parent CAS. All child |
| // CASes will be dropped in finalStep() as they come back from delegates. When all are |
| // accounted for and dropped, the parent CAS will be returned back to the client |
| // with an exception. |
| if (parentCasStateEntry.isFailed()) { |
| // Fetch Delegate object for the CM that produced the CAS. The producer key |
| // is associated with a cache entry in the ProcessRequestHandler. Each new CAS |
| // must have a key of a CM that produced it. |
| Delegate delegateCM = lookupDelegate(entry.getCasProducerKey()); |
| if (delegateCM != null && delegateCM.getEndpoint().isCasMultiplier()) { |
| // If the delegate CM is a remote, send a Free CAS notification |
| if (delegateCM.getEndpoint().isRemote()) { |
| parentCasStateEntry.getFreeCasNotificationEndpoint().setCommand(AsynchAEMessage.Stop); |
| getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(), |
| parentCasStateEntry.getFreeCasNotificationEndpoint()); |
| } |
| // Check if a request to stop generation of new CASes from the parent of |
| // this CAS has been sent to the CM. The Delegate object keeps track of |
| // requests to STOP that are sent to the CM. Only one STOP is needed. |
| if (delegateCM.isGeneratingChildrenFrom(parentCasStateEntry.getCasReferenceId())) { |
| // Issue a request to the CM to stop producing new CASes from a given input |
| // CAS |
| stopCasMultiplier(delegateCM, parentCasStateEntry.getCasReferenceId()); |
| } |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINE, |
| CLASS_NAME.getName(), |
| "abortProcessingCas", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_forcing_cas_to_finalstep__FINE", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), |
| casStateEntry.getSubordinateCasInPlayCount() }); |
| } |
| casStateEntry.setReplyReceived(); |
| // Force the CAS to go to the Final Step where it will be dropped |
| finalStep(new FinalStep(), casStateEntry.getCasReferenceId()); |
| return true; // Done here |
| } |
| } else if (casStateEntry.isFailed()) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINE, |
| CLASS_NAME.getName(), |
| "abortProcessingCas", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_forcing_cas_to_finalstep__FINE", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), |
| casStateEntry.getSubordinateCasInPlayCount() }); |
| } |
| casStateEntry.setReplyReceived(); |
| // move this CAS to the final step |
| finalStep(new FinalStep(), casStateEntry.getCasReferenceId()); |
| return true; |
| } |
| |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "abortProcessingCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * This is a process method that is executed for CASes not created by a Multiplier in this |
| * aggregate. |
| * |
| */ |
| public void process(CAS aCAS, String aCasReferenceId) { |
| boolean handlingDelayedStep = false; |
| // First check if there are outstanding steps to be called before consulting the Flow |
| // Controller. |
| // This could be the case if a previous step was a parallel step and it contained collocated |
| // delegates. |
| if (!isStopped()) { |
| if (abortGeneratingCASes(aCasReferenceId)) { |
| // Force delegate Cas Multipliers to Stop generating new CASes |
| super.stopCasMultipliers(); |
| } |
| try { |
| CacheEntry entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId); |
| CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId); |
| // Check if this CAS should be aborted due to previous error on this CAS or its |
| // parent. If this is the case the method will move the CAS to the final state |
| // where it will be dropped. If the CAS is an input CAS, it will be returned to |
| // the client with an exception |
| if (abortProcessingCas(casStateEntry, entry)) { |
| // This CAS was aborted, we are done here |
| return; |
| } |
| // Check if this is an input CAS from the client. If not, check if last |
| // delegate handling this CAS was a Cas Multiplier configured to process |
| // parent CAS last |
| if (casStateEntry.getLastDelegate() != null) { |
| // Fetch the endpoint corresponding to the last Delegate handling the CAS |
| Endpoint lastDelegateEndpoint = casStateEntry.getLastDelegate().getEndpoint(); |
| // Check if this delegate is a Cas Multiplier and the parent CAS is to be processed last |
| casStateEntry.setReplyReceived(); |
| if (lastDelegateEndpoint.isCasMultiplier() && lastDelegateEndpoint.processParentLast()) { |
| synchronized (super.finalStepMux) { |
| // Determine if the CAS should be held until all its children leave this aggregate. |
| if (casStateEntry.getSubordinateCasInPlayCount() > 0) { |
| // This input CAS has child CASes still in play. It will remain in the cache |
| // until the last of the child CASes is released. Only than, the input CAS is |
| // is allowed to continue into the next step in the flow. |
| // The CAS has to be in final state |
| casStateEntry.setState(CacheEntry.FINAL_STATE); |
| // The input CAS will be interned until all children leave this aggregate |
| return; |
| } |
| } |
| } |
| } |
| // if we are here entry is not null. The above throws an exception if an entry is not |
| // found in the cache. First check if there is a delayedSingleStepList in the cache. |
| // If there is one, it means that a parallel step contained collocated delegate(s) |
| // The parallel step may only contain remote delegates. All collocated delegates |
| // were removed from the parallel step and added to the delayedSingleStepList in |
| // parallelStep() method. |
| List delayedSingleStepList = entry.getDelayedSingleStepList(); |
| if (delayedSingleStepList != null && delayedSingleStepList.size() > 0) { |
| handlingDelayedStep = true; |
| // Reset number of parallel delegates back to one. This is done only if the previous step |
| // was a parallel step. |
| synchronized (parallelStepMux) { |
| if (casStateEntry.getNumberOfParallelDelegates() > 1) { |
| casStateEntry.setNumberOfParallelDelegates(1); |
| } |
| } |
| // Remove a delegate endpoint from the single step list cached in the CAS entry |
| Endpoint endpoint = (Endpoint_impl) entry.getDelayedSingleStepList().remove(0); |
| // send the CAS to a collocated delegate from the delayed single step list. |
| dispatchProcessRequest(aCasReferenceId, endpoint, true); |
| } |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", |
| new Object[] { e }); |
| } |
| } finally { |
| // If just handled the delayed step, return as there is nothing else to do |
| if (handlingDelayedStep) { |
| return; |
| } |
| } |
| } |
| |
| FlowContainer flow = null; |
| try { |
| if (aCasReferenceId != null) { |
| try { |
| // Check if a Flow object has been previously generated for the Cas. |
| if (flowMap.containsKey(aCasReferenceId)) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_retrieve_flow_object__FINEST", |
| new Object[] { getComponentName(), aCasReferenceId }); |
| } |
| flow = (FlowContainer) flowMap.get(aCasReferenceId); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_retrieved_flow_object_ok__FINEST", |
| new Object[] { getComponentName(), aCasReferenceId }); |
| } |
| } else { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_new_flow_object__FINEST", new Object[] { aCasReferenceId }); |
| } |
| synchronized (flowControllerContainer) { |
| flow = flowControllerContainer.computeFlow(aCAS); |
| } |
| // Save the Flow Object in a cache. Flow exists in the cache |
| // until the CAS is fully processed or it is |
| // explicitly deleted when processing of this CAS cannot |
| // continue |
| flowMap.put(aCasReferenceId, flow); |
| // Check if the local cache already contains an entry for the Cas id. |
| // A colocated Cas Multiplier may have already registered this CAS |
| // in the parent's controller |
| if (localCache.lookupEntry(aCasReferenceId) == null) { |
| // Add this Cas Id to the local cache. Every input CAS goes through here |
| localCache.createCasStateEntry(aCasReferenceId); |
| } |
| } |
| } catch (Exception ex) { |
| // Any error here is automatic termination |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { ex }); |
| } |
| sendReplyWithShutdownException(aCasReferenceId); |
| |
| handleAction(ErrorHandler.TERMINATE, null, null); |
| return; |
| } |
| if (!isStopped()) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_executing_step_input_cas__FINEST", |
| new Object[] { getComponentName(), aCasReferenceId }); |
| } |
| // Execute a step in the flow. false means that this CAS has not |
| // been produced by CAS Multiplier |
| executeFlowStep(flow, aCasReferenceId, false); |
| } else { |
| synchronized (flowControllerContainer) { |
| flow.aborted(); |
| } |
| } |
| } |
| } catch (Exception e) { |
| HashMap map = new HashMap(); |
| map.put(AsynchAEMessage.Command, AsynchAEMessage.Process); |
| map.put(AsynchAEMessage.CasReference, aCasReferenceId); |
| handleError(map, e); |
| } |
| } |
| |
| private void simpleStep(SimpleStep aStep, String aCasReferenceId)// throws AsynchAEException |
| { |
| Endpoint endpoint = null; |
| try { |
| String analysisEngineKey = aStep.getAnalysisEngineKey(); |
| // Find the endpoint for the delegate |
| endpoint = lookUpEndpoint(analysisEngineKey, true); |
| if (endpoint != null) { |
| endpoint.setController(this); |
| CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId); |
| |
| if (endpoint.isCasMultiplier()) { |
| Delegate delegateCM = lookupDelegate(analysisEngineKey); |
| delegateCM.setGeneratingChildrenFrom(aCasReferenceId, true); |
| // Record the outgoing CAS. CASes destined for remote CM are recorded |
| // in JmsOutputchannel. |
| if (!endpoint.isRemote()) { |
| delegateCM.addNewCasToOutstandingList(aCasReferenceId, true); |
| } |
| } |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "simpleStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_next_step__FINEST", new Object[] {analysisEngineKey, aCasReferenceId }); |
| } |
| |
| // Reset number of parallel delegates back to one. This is done only if the previous step |
| // was a parallel step. |
| synchronized (parallelStepMux) { |
| if (casStateEntry.getNumberOfParallelDelegates() > 1) { |
| casStateEntry.setNumberOfParallelDelegates(1); |
| } |
| } |
| if (!isStopped()) { |
| // Start a timer for this request. The amount of time to wait |
| // for response is provided in configuration for this endpoint |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "simpleStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_next_step_dispatch__FINEST", |
| new Object[] { getComponentName(), analysisEngineKey, aCasReferenceId }); |
| } |
| dispatchProcessRequest(aCasReferenceId, endpoint, true); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "simpleStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_next_step_dispatch_completed__FINEST", |
| new Object[] { getComponentName(), analysisEngineKey, aCasReferenceId }); |
| } |
| } |
| } |
| } catch (Throwable e) { |
| HashMap map = new HashMap(); |
| map.put(AsynchAEMessage.Command, AsynchAEMessage.Process); |
| map.put(AsynchAEMessage.CasReference, aCasReferenceId); |
| if (endpoint != null) { |
| map.put(AsynchAEMessage.Endpoint, endpoint); |
| } |
| handleError(map, e); |
| } |
| |
| } |
| |
| private void parallelStep(ParallelStep aStep, String aCasReferenceId) throws AsynchAEException { |
| try { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "parallelStep", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_parallel_step__FINE", |
| new Object[] { getComponentName(), aCasReferenceId }); |
| } |
| Collection keyList = aStep.getAnalysisEngineKeys(); |
| String[] analysisEngineKeys = new String[keyList.size()]; |
| keyList.toArray(analysisEngineKeys); |
| List parallelDelegateList = new ArrayList(); |
| List singleStepDelegateList = null; |
| // Only remote delegates can be in a parallel step. Iterate over the |
| // delegates in parallel step and assign each to a different list based on location. |
| // Remote delegates are assigned to parallelDelegateList, whereas co-located |
| // delegates are assigned to singleStepDelegateList. Those delegates |
| // assigned to the singleStepDelegateList will be executed sequentially |
| // once all parallel delegates respond. |
| for (int i = 0; i < analysisEngineKeys.length; i++) { |
| // Fetch an endpoint corresponding to a given delegate key |
| Endpoint endpoint = lookUpEndpoint(analysisEngineKeys[i], true); |
| endpoint.setController(this); |
| // Assign delegate to appropriate list |
| if (endpoint.isRemote()) { |
| parallelDelegateList.add(endpoint); |
| } else { |
| if (singleStepDelegateList == null) { |
| singleStepDelegateList = new ArrayList(); |
| } |
| singleStepDelegateList.add(endpoint); |
| if (UIMAFramework.getLogger().isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "parallelStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_move_to_single_step_list__FINE", |
| new Object[] { getComponentName(), analysisEngineKeys[i], aCasReferenceId }); |
| } |
| } |
| } |
| // Fetch cache entry for a given CAS id |
| CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId); |
| CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId); |
| |
| // Add all co-located delegates to the cache. These delegates will be called |
| // sequentially once all parallel delegates respond |
| if (singleStepDelegateList != null) { |
| // Add a list containing single step delegates to the cache |
| // These delegates will be called sequentially when all parallel |
| // delegates respond. |
| cacheEntry.setDelayedSingleStepList(singleStepDelegateList); |
| } |
| // Check if there are any delegates in the parallel step. It is possible that |
| // all of the delegates were co-located and thus the parallel delegate list |
| // is empty. |
| if (parallelDelegateList.size() > 0) { |
| // Create endpoint array to contain as many slots as there are parallel delegates |
| Endpoint[] endpoints = new Endpoint_impl[parallelDelegateList.size()]; |
| // Copy parallel delegate endpoints to the array |
| parallelDelegateList.toArray(endpoints); |
| synchronized (parallelStepMux) { |
| casStateEntry.resetDelegateResponded(); |
| // Set number of delegates in the parallel step |
| casStateEntry.setNumberOfParallelDelegates(endpoints.length); |
| } |
| // Dispatch CAS to remote parallel delegates |
| dispatchProcessRequest(aCasReferenceId, endpoints, true); |
| } else { |
| // All delegates in a parallel step are co-located. Send the CAS |
| // to the first delegate in the single step list. |
| process(null, aCasReferenceId); |
| } |
| } catch (Throwable e) { |
| HashMap map = new HashMap(); |
| map.put(AsynchAEMessage.Command, AsynchAEMessage.Process); |
| map.put(AsynchAEMessage.CasReference, aCasReferenceId); |
| handleError(map, e); |
| } |
| } |
| |
| public void sendRequestForMetadataToRemoteDelegates() throws AsynchAEException { |
| synchronized(childControllerList) { |
| if ( childControllerList.size() > 0 ) { |
| for( AnalysisEngineController childController : childControllerList ) { |
| if (childController instanceof AggregateAnalysisEngineController) { |
| ((AggregateAnalysisEngineController) childController) |
| .sendRequestForMetadataToRemoteDelegates(); |
| } |
| } |
| } |
| } |
| Endpoint[] delegateEndpoints = new Endpoint[destinationMap.size()]; |
| |
| // First copy endpoints to an array so that we dont get Concurrent access problems |
| // in case an error handler needs to disable the endpoint. |
| Set keySet = destinationMap.keySet(); |
| Iterator it = keySet.iterator(); |
| int indx = 0; |
| while (it.hasNext()) { |
| delegateEndpoints[indx++] = (Endpoint) destinationMap.get((String) it.next()); |
| } |
| // Now send GetMeta request to all remote delegates |
| for (int i = 0; !isStopped() && i < delegateEndpoints.length; i++) { |
| if (delegateEndpoints[i].isRemote()) { |
| delegateEndpoints[i].initialize(); |
| delegateEndpoints[i].setController(this); |
| String key = lookUpDelegateKey(delegateEndpoints[i].getEndpoint()); |
| if (key != null && destinationMap.containsKey(key)) { |
| Endpoint endpoint = ((Endpoint) destinationMap.get(key)); |
| if (key != null && endpoint != null) { |
| ServiceInfo serviceInfo = endpoint.getServiceInfo(); |
| PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo(endpoint.isCasMultiplier(), null); |
| pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL()); |
| pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName()); |
| if (endpoint.getDestination() != null) { |
| pServiceInfo.setReplyQueueName(endpoint.getDestination().toString()); |
| } |
| pServiceInfo.setServiceKey(key); |
| pServiceInfo.setState(serviceInfo.getState()); |
| pServiceInfo.setAnalysisEngineInstanceCount(1); |
| |
| registerWithAgent(pServiceInfo, super.getManagementInterface().getJmxDomain() |
| + super.jmxContext + ",r" + remoteIndex + "=" + key |
| + " [Remote Uima EE Service],name=" + key + "_" + serviceInfo.getLabel()); |
| |
| ServicePerformance servicePerformance = new ServicePerformance(); |
| // servicePerformance.setIdleTime(System.nanoTime()); |
| servicePerformance.setRemoteDelegate(); |
| |
| registerWithAgent(servicePerformance, super.getManagementInterface().getJmxDomain() |
| + super.jmxContext + ",r" + remoteIndex + "=" + key |
| + " [Remote Uima EE Service],name=" + key + "_" + servicePerformance.getLabel()); |
| |
| ServiceErrors serviceErrors = new ServiceErrors(); |
| |
| registerWithAgent(serviceErrors, super.getManagementInterface().getJmxDomain() |
| + super.jmxContext + ",r" + remoteIndex + "=" + key |
| + " [Remote Uima EE Service],name=" + key + "_" + serviceErrors.getLabel()); |
| remoteIndex++; |
| |
| serviceErrorMap.put(key, serviceErrors); |
| Object[] delegateStatsArray = new Object[] { pServiceInfo, servicePerformance, |
| serviceErrors }; |
| |
| delegateStatMap.put(key, delegateStatsArray); |
| } |
| // If the service has stopped dont bother doing anything else. The service |
| // may have been stopped because listener connection could not be established. |
| if (isStopped()) { |
| return; |
| } |
| if (delegateEndpoints[i].getStatus() == Endpoint.OK ) { |
| dispatchMetadataRequest(delegateEndpoints[i]); |
| } |
| } |
| } |
| |
| else { |
| // collocated delegate |
| delegateEndpoints[i].initialize(); |
| delegateEndpoints[i].setController(this); |
| |
| delegateEndpoints[i].setWaitingForResponse(true); |
| try { |
| UimaMessage message = getTransport(delegateEndpoints[i].getEndpoint()).produceMessage( |
| AsynchAEMessage.GetMeta, AsynchAEMessage.Request, getName()); |
| UimaTransport transport = getTransport(delegateEndpoints[i].getEndpoint()); |
| transport.getUimaMessageDispatcher(delegateEndpoints[i].getEndpoint()).dispatch(message); |
| } catch (Exception e) { |
| throw new AsynchAEException(e); |
| } |
| } |
| } |
| } |
| |
| private CasStateEntry fetchParentCasFromLocalCache(CasStateEntry casStateEntry) throws Exception { |
| // Lookup parent CAS in the local cache |
| CasStateEntry parentCasStateEntry = localCache.lookupEntry(casStateEntry |
| .getInputCasReferenceId()); |
| if (parentCasStateEntry == null) { |
| |
| if (UIMAFramework.getLogger().isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME) |
| .logrb( |
| Level.INFO, |
| CLASS_NAME.getName(), |
| "fetchParentCas", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_cas_not_found__INFO", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), |
| "Local Cache" }); |
| } |
| } |
| return parentCasStateEntry; |
| } |
| |
| private CacheEntry fetchParentCasFromGlobalCache(CasStateEntry casStateEntry) throws Exception { |
| CacheEntry parentCASCacheEntry = null; |
| try { |
| // Fetch the parent Cas cache entry |
| parentCASCacheEntry = getInProcessCache().getCacheEntryForCAS( |
| casStateEntry.getInputCasReferenceId()); |
| } catch (Exception ex) { |
| |
| if (UIMAFramework.getLogger().isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.INFO, |
| CLASS_NAME.getName(), |
| "fetchParentCas", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_cas_not_found__INFO", |
| new Object[] { getComponentName(), casStateEntry.getInputCasReferenceId(), |
| "InProcessCache" }); |
| } |
| } |
| return parentCASCacheEntry; |
| } |
| |
| private boolean casHasChildrenInPlay(CasStateEntry casStateEntry) throws Exception { |
| if (casStateEntry.getSubordinateCasInPlayCount() > 0) { |
| // This CAS has child CASes still in play. This CAS will remain in the cache |
| // until all its children are fully processed. |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINEST, |
| CLASS_NAME.getName(), |
| "casHasChildrenInPlay", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_final_step_parent_cas_child_count__FINEST", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), |
| casStateEntry.getSubordinateCasInPlayCount() }); |
| } |
| // Leave input CAS in pending state. It will be returned to the client |
| // *only* if the last subordinate CAS is fully processed. |
| casStateEntry.setPendingReply(true); |
| // Done here. There are subordinate CASes still being processed. |
| return true; |
| } |
| return false; |
| } |
| |
| public void finalStep(FinalStep aStep, String aCasReferenceId) { |
| Endpoint endpoint = null; |
| boolean replySentToClient = false; |
| boolean isSubordinate = false; |
| CacheEntry cacheEntry = null; |
| CasStateEntry casStateEntry = null; |
| CasStateEntry parentCasStateEntry = null; |
| Endpoint freeCasEndpoint = null; |
| CacheEntry parentCASCacheEntry = null; |
| Endpoint cEndpoint = null; |
| boolean casDropped = false; |
| boolean doDecrementChildCount = false; |
| localCache.dumpContents(); |
| |
| // First locate entries in the global and local cache for a given CAS |
| // If not found, log a message and return |
| try { |
| // Get entry from the cache for a given CAS Id. This throws an exception if |
| // an entry doesnt exist in the cache |
| cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId); |
| casStateEntry = localCache.lookupEntry(aCasReferenceId); |
| if (casStateEntry.getState() != CacheEntry.FINAL_STATE) { |
| // Mark the entry to indicate that the CAS reached a final step. This CAS |
| // may still have children and will not be returned to the client until |
| // all of them are fully processed. This state info will aid in the |
| // internal bookkeeping when the final child is processed. |
| casStateEntry.setFinalStep(aStep); |
| casStateEntry.setState(CacheEntry.FINAL_STATE); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINE, |
| CLASS_NAME.getName(), |
| "finalStep", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_cas_in_finalstep__FINE", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), |
| casStateEntry.getSubordinateCasInPlayCount() }); |
| } |
| } |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "finalStep", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", |
| new Object[] { e }); |
| } |
| return; |
| } |
| |
| // Found entries in caches for a given CAS id |
| try { |
| endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId); |
| |
| synchronized (super.finalStepMux) { |
| // Check if the global cache still contains the CAS. It may have been deleted by another |
| // thread already |
| if (!getInProcessCache().entryExists(aCasReferenceId)) { |
| return; |
| } |
| // Check if this CAS has children that are still being processed in this aggregate |
| if (casHasChildrenInPlay(casStateEntry)) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINE, |
| CLASS_NAME.getName(), |
| "finalStep", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_cas_has_children__FINE", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), |
| casStateEntry.getCasReferenceId(), |
| casStateEntry.getSubordinateCasInPlayCount() }); |
| } |
| |
| replySentToClient = false; |
| return; |
| } |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_final_step_parent_cas_no_children__FINEST", |
| new Object[] { getComponentName(), aCasReferenceId }); |
| } |
| // Determine if this CAS is a child of some CAS |
| isSubordinate = casStateEntry.getInputCasReferenceId() != null; |
| |
| if (isSubordinate) { |
| // fetch the destination of a CM that produced this CAS, so that we know where to send |
| // Free Cas Notification |
| freeCasEndpoint = cacheEntry.getFreeCasEndpoint(); |
| parentCasStateEntry = fetchParentCasFromLocalCache(casStateEntry); |
| parentCASCacheEntry = fetchParentCasFromGlobalCache(casStateEntry); |
| doDecrementChildCount = true; |
| } |
| // If the CAS was generated by this component but the Flow Controller wants to drop it OR |
| // this component |
| // is not a Cas Multiplier |
| if (forceToDropTheCas(parentCasStateEntry, cacheEntry, aStep)) { |
| if (casStateEntry.isReplyReceived()) { |
| if (isSubordinate) { |
| // drop the flow since we no longer need it |
| dropFlow(aCasReferenceId, true); |
| // Drop the CAS and remove cache entry for it |
| dropCAS(aCasReferenceId, true); |
| casDropped = true; |
| // If debug level=FINEST dump the entire cache |
| localCache.dumpContents(); |
| // Set this state as if we sent the reply to the client. This triggers a cleanup of |
| // origin map and stats |
| // for the current cas |
| if (isTopLevelComponent()) { |
| replySentToClient = true; |
| } |
| } |
| } else { |
| doDecrementChildCount = false; |
| } |
| } else if (!casStateEntry.isDropped()) { |
| casStateEntry.setWaitingForRelease(true); |
| // Send a reply to the Client. If the CAS is an input CAS it will be dropped |
| cEndpoint = replyToClient(cacheEntry, casStateEntry); |
| replySentToClient = true; |
| if (cEndpoint.isRemote()) { |
| // if this service is a Cas Multiplier don't remove the CAS. It will be removed |
| // when a remote client sends explicit Release CAS Request |
| if (!isCasMultiplier()) { |
| // Drop the CAS and remove cache entry for it |
| dropCAS(aCasReferenceId, true); |
| } |
| casDropped = true; |
| } else { |
| // Remove entry from the local cache for this CAS. If the client |
| // is remote the entry was removed in replyToClient() |
| try { |
| localCache.lookupEntry(aCasReferenceId).setDropped(true); |
| } catch (Exception e) { |
| } |
| localCache.remove(aCasReferenceId); |
| } |
| // If debug level=FINEST dump the entire cache |
| localCache.dumpContents(); |
| } |
| |
| if (parentCasStateEntry == null && isSubordinate) { |
| parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId()); |
| } |
| if (doDecrementChildCount) { |
| // Child CAS has been fully processed, decrement its parent count of active child CASes |
| if (parentCasStateEntry != null) { |
| parentCasStateEntry.decrementSubordinateCasInPlayCount(); |
| // If debug level=FINEST dump the entire cache |
| localCache.dumpContents(); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINE, |
| CLASS_NAME.getName(), |
| "finalStep", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_cas_decremented_child_count__FINE", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), |
| casStateEntry.getSubordinateCasInPlayCount() }); |
| } |
| } |
| } |
| |
| boolean clientIsCollocated = (cEndpoint == null || !cEndpoint.isRemote()); |
| |
| if (parentCasStateEntry != null && parentCasStateEntry.getSubordinateCasInPlayCount() == 0 |
| && parentCasStateEntry.isFailed()) { |
| parentCasStateEntry.setReplyReceived(); |
| } |
| // For subordinate CAS, check if its parent needs to be put in play. This should happen if |
| // this CAS was the last of the children in play |
| if (isSubordinate && releaseParentCas(casDropped, clientIsCollocated, parentCasStateEntry)) { |
| // Put the parent CAS in play. The parent CAS can be in one of two places now depending |
| // on the configuration. The parent CAS could have been suspended in the final step, or it |
| // could have |
| // been suspended in the process method. If the configuration indicates that the parent |
| // should follow only when the last of its children leaves this aggregate, call the |
| // process method. |
| // Otherwise, the CAS is in a final state and simply needs to resume there. |
| Endpoint lastDelegateEndpoint = casStateEntry.getLastDelegate().getEndpoint(); |
| if (lastDelegateEndpoint.processParentLast()) { |
| // The parent was suspended in the process call. Resume processing the CAS |
| process(parentCASCacheEntry.getCas(), parentCasStateEntry.getCasReferenceId()); |
| } else { |
| // The parent was suspended in the final step. Resume processing the CAS |
| finalStep(parentCasStateEntry.getFinalStep(), parentCasStateEntry.getCasReferenceId()); |
| } |
| } |
| } // synchronized |
| if (endpoint != null) { |
| // remove stats associated with this Cas and a given endpoint |
| dropStats(aCasReferenceId, endpoint.getEndpoint()); |
| } |
| } catch (Exception e) { |
| HashMap map = new HashMap(); |
| map.put(AsynchAEMessage.Command, AsynchAEMessage.Process); |
| map.put(AsynchAEMessage.CasReference, aCasReferenceId); |
| if (endpoint != null) { |
| map.put(AsynchAEMessage.Endpoint, endpoint); |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| handleError(map, e); |
| } finally { |
| if (replySentToClient) { |
| removeMessageOrigin(aCasReferenceId); |
| dropStats(aCasReferenceId, super.getName()); |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINEST, |
| CLASS_NAME.getName(), |
| "finalStep", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_final_step_show_internal_stats__FINEST", |
| new Object[] { getName(), flowMap.size(), getInProcessCache().getSize(), |
| originMap.size(), super.statsMap.size() }); |
| } |
| // freeCasEndpoint is a special endpoint for sending Free CAS Notification. |
| if (casDropped && freeCasEndpoint != null) { |
| freeCasEndpoint.setReplyEndpoint(true); |
| try { |
| // send Free CAS Notification to a Cas Multiplier |
| getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId, |
| freeCasEndpoint); |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| } |
| } |
| } |
| } |
| |
| public boolean releaseParentCas(boolean casDropped, boolean clientIsCollocated, |
| CasStateEntry parentCasStateEntry) { |
| if (parentCasStateEntry == null) { |
| return false; |
| } |
| |
| // To release the parent CAS, the following conditions must be true |
| boolean retValue = |
| // The child CAS was dropped OR this Aggregate Client is colocated |
| (casDropped || clientIsCollocated) |
| // The parent CAS has been received by this Aggregate |
| && parentCasStateEntry.isReplyReceived() |
| // The CAS has to be in final state |
| && parentCasStateEntry.getState() == CacheEntry.FINAL_STATE |
| // To release the CAS, it may not have any children (subordinate CASes) |
| && parentCasStateEntry.getSubordinateCasInPlayCount() == 0; |
| |
| if (clientEndpoint != null && UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINEST, |
| CLASS_NAME.getName(), |
| "finalStep", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_show_why_not_releasing_parent__FINEST", |
| new Object[] { getComponentName(), parentCasStateEntry.getCasReferenceId(), retValue, |
| casDropped, clientEndpoint.isRemote(), parentCasStateEntry.isReplyReceived(), |
| parentCasStateEntry.isPendingReply(), |
| parentCasStateEntry.getState() == CacheEntry.FINAL_STATE, |
| parentCasStateEntry.getSubordinateCasInPlayCount(), getComponentName() }); |
| } |
| return retValue; |
| } |
| |
| private boolean forceToDropTheCas(CasStateEntry entry, CacheEntry cacheEntry, FinalStep aStep) { |
| // Get the key of the Cas Producer |
| String casProducer = cacheEntry.getCasProducerAggregateName(); |
| // CAS is considered new from the point of view of this service IF it was produced by it |
| boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals( |
| casProducer)); |
| if (entry != null && entry.isFailed() && isNewCas) { |
| return true; // no point to continue if the CAS was produced in this aggregate and its parent |
| // failed here |
| } |
| // If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR |
| // this component |
| // is not a Cas Multiplier |
| if (isNewCas && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) { |
| return true; |
| } |
| return false; |
| } |
| |
| private boolean casHasExceptions(CasStateEntry casStateEntry) { |
| return (casStateEntry.getErrors().size() > 0) ? true : false; |
| } |
| |
| private void sendReplyWithException(CacheEntry acacheEntry, CasStateEntry casStateEntry, |
| Endpoint replyEndpoint) throws Exception { |
| // boolean casProducedInThisAggregate = |
| // getComponentName().equals(cacheEntry.getCasProducerAggregateName()); |
| if (casStateEntry.isSubordinate()) { |
| // We must reply with the input CAS |
| // casStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId()); |
| casStateEntry = getLocalCache().getTopCasAncestor(casStateEntry.getCasReferenceId()); |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINE, |
| CLASS_NAME.getName(), |
| "sendReplyWithException", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_returning_exception_to_client__FINE", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), |
| replyEndpoint.getEndpoint() }); |
| } |
| if (replyEndpoint.isRemote()) { |
| // this is an input CAS that has been marked as failed. Return the input CAS |
| // and an exception to the client. |
| getOutputChannel().sendReply(casStateEntry.getErrors().get(0), |
| casStateEntry.getCasReferenceId(), null, replyEndpoint, AsynchAEMessage.Process); |
| } else { |
| replyEndpoint.setReplyEndpoint(true); |
| UimaTransport vmTransport = getTransport(replyEndpoint.getEndpoint()); |
| UimaMessage message = vmTransport.produceMessage(AsynchAEMessage.Process, |
| AsynchAEMessage.Response, this.getName()); |
| message.addIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception); |
| message.addStringProperty(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId()); |
| |
| Throwable wrapper = null; |
| Throwable cause = casStateEntry.getErrors().get(0); |
| if (!(cause instanceof UimaEEServiceException)) { |
| // Strip off AsyncAEException and replace with UimaEEServiceException |
| if (cause instanceof AsynchAEException && cause.getCause() != null) { |
| wrapper = new UimaEEServiceException(cause.getCause()); |
| } else { |
| wrapper = new UimaEEServiceException(cause); |
| } |
| } |
| if (wrapper == null) { |
| message.addObjectProperty(AsynchAEMessage.Cargo, cause); |
| } else { |
| message.addObjectProperty(AsynchAEMessage.Cargo, wrapper); |
| } |
| vmTransport.getUimaMessageDispatcher(replyEndpoint.getEndpoint()).dispatch(message); |
| } |
| } |
| |
| private boolean sendExceptionToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry, |
| Endpoint replyEndpoint) throws Exception { |
| // Dont send CASes to the client if the input CAS is in failed state. One |
| // of the descendant CASes may have failed in one of the delegates. Any |
| // exception on descendant CAS causes the input CAS to be returned to the |
| // client with an exception but only when all its descendant CASes are |
| // accounted for and released. |
| if (casStateEntry.isSubordinate()) { |
| |
| // Fetch the top ancestor CAS of this CAS. |
| CasStateEntry topAncestorCasStateEntry = getLocalCache().getTopCasAncestor( |
| casStateEntry.getInputCasReferenceId()); |
| // check the state |
| if (topAncestorCasStateEntry.isFailed() && casHasExceptions(casStateEntry) |
| && topAncestorCasStateEntry.getSubordinateCasInPlayCount() == 0) { |
| return true; |
| } else { |
| // Add the id of the generated CAS to the map holding outstanding CASes. This |
| // map will be referenced when a client sends Free CAS Notification. The map |
| // stores the id of the CAS both as a key and a value. Map is used to facilitate |
| // quick lookup |
| cmOutstandingCASes |
| .put(casStateEntry.getCasReferenceId(), casStateEntry.getCasReferenceId()); |
| } |
| } else if (casStateEntry.isFailed() && casHasExceptions(casStateEntry)) { |
| return true; |
| } |
| return false; |
| } |
| |
| private void sendReplyToRemoteClient(CacheEntry cacheEntry, CasStateEntry casStateEntry, |
| Endpoint replyEndpoint) throws Exception { |
| if (sendExceptionToClient(cacheEntry, casStateEntry, replyEndpoint)) { |
| sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint); |
| } else { |
| // Send response to a given endpoint |
| getOutputChannel().sendReply(cacheEntry, replyEndpoint); |
| // Drop the CAS only if the client is remote and the CAS is an input CAS. |
| // If this CAS has a parent the client will send Release CAS notification to release the CAS. |
| if (!casStateEntry.isSubordinate()) { |
| dropCAS(casStateEntry.getCasReferenceId(), true); |
| // If the cache is empty change the state of the Aggregate to idle |
| if (getInProcessCache().isEmpty()) { |
| endProcess(AsynchAEMessage.Process); |
| } |
| } |
| } |
| } |
| |
| private void sendReplyToCollocatedClient(CacheEntry cacheEntry, CasStateEntry casStateEntry, |
| Endpoint replyEndpoint) throws Exception { |
| boolean casProducedInThisAggregate = getComponentName().equals( |
| cacheEntry.getCasProducerAggregateName()); |
| boolean isSubordinate = casStateEntry.isSubordinate(); |
| boolean serviceIsCM = isCasMultiplier(); |
| if (sendExceptionToClient(cacheEntry, casStateEntry, replyEndpoint)) { |
| try { |
| sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint); |
| } catch (Exception e) { |
| } finally { |
| if (casProducedInThisAggregate) { |
| // Drop the CAS generated in this Aggregate |
| dropCAS(casStateEntry.getCasReferenceId(), true); |
| } |
| } |
| } else { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINE, |
| CLASS_NAME.getName(), |
| "sendReplyToCollocatedClient", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_sending_reply_to_client__FINE", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), |
| replyEndpoint.getEndpoint() }); |
| } |
| int mType = AsynchAEMessage.Response; |
| // Check if the CAS was produced in this aggregate by any of its delegates |
| // If so, send the CAS as a request. Otherwise, the CAS is an input CAS and |
| // needs to return as reply. |
| if (isSubordinate && serviceIsCM && casProducedInThisAggregate) { |
| // this is a Cas Multiplier, send this CAS to the client in a request message. |
| mType = AsynchAEMessage.Request; |
| // Return the CAS to the colocated client. First make sure that this CAS |
| // is associated with the input CAS. This CAS may have been produced from |
| // an intermediate CAS (which was produced from the input CAS). From the |
| // client perspective, this Cas Multiplier Aggregate is a black box, |
| // all CASes produced here must be linked with the input CAS. |
| // Find the top ancestor of this CAS. It is the input CAS sent by the client |
| String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry); |
| // Modify the parent of this CAS. |
| if (inputCasId != null && !inputCasId.equals(casStateEntry.getInputCasReferenceId())) { |
| casStateEntry.setInputCasReferenceId(inputCasId); |
| cacheEntry.setInputCasReferenceId(inputCasId); |
| } |
| } |
| // Send CAS to a given reply endpoint |
| sendVMMessage(mType, replyEndpoint, cacheEntry); |
| } |
| } |
| |
| private boolean validEndpoint(Endpoint endpoint, CasStateEntry casStateEntry) { |
| if (endpoint == null) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "validEndpoint", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_client_endpoint_not_found__INFO", |
| new Object[] { getComponentName(), casStateEntry.getCasReferenceId() }); |
| } |
| return false; |
| } |
| if (endpoint.getEndpoint() == null) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "validEndpoint", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_no_reply_destination__INFO", |
| new Object[] { casStateEntry.getCasReferenceId() }); |
| } |
| HashMap map = new HashMap(); |
| map.put(AsynchAEMessage.Command, AsynchAEMessage.Process); |
| map.put(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId()); |
| handleError(map, new UnknownDestinationException()); |
| return false; |
| } |
| // Dont send a reply to the client if the client is a CAS multiplier |
| if (endpoint.isCasMultiplier()) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| private Endpoint replyToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry) |
| throws Exception { |
| Endpoint endpoint = getReplyEndpoint(cacheEntry, casStateEntry); |
| if (!validEndpoint(endpoint, casStateEntry)) { |
| return null; // the reason has already been logged |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINEST, |
| CLASS_NAME.getName(), |
| "replyToClient", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_final_step__FINEST", |
| new Object[] { casStateEntry.getCasReferenceId(), |
| (double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 }); |
| } |
| endpoint.setFinal(true); |
| if (!isStopped()) { |
| try { |
| if (endpoint.isRemote()) { |
| sendReplyToRemoteClient(cacheEntry, casStateEntry, endpoint); |
| } else { |
| sendReplyToCollocatedClient(cacheEntry, casStateEntry, endpoint); |
| } |
| } catch ( Exception e) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| |
| } |
| } |
| return endpoint; |
| } |
| |
| private void sendVMMessage(int messageType, Endpoint endpoint, CacheEntry cacheEntry) |
| throws Exception { |
| // If the CAS was produced by this aggregate send the request message to the client |
| // Otherwise send the response message. |
| UimaTransport transport = getTransport(endpoint.getEndpoint()); |
| UimaMessage message = transport.produceMessage(AsynchAEMessage.Process, messageType, getName()); |
| if (cacheEntry.getCasProducerAggregateName() != null |
| && cacheEntry.getCasProducerAggregateName().equals(getComponentName())) { |
| message.addLongProperty(AsynchAEMessage.CasSequence, cacheEntry.getCasSequence()); |
| } |
| message.addStringProperty(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId()); |
| if (cacheEntry.getInputCasReferenceId() != null) { |
| message.addStringProperty(AsynchAEMessage.InputCasReference, cacheEntry |
| .getInputCasReferenceId()); |
| } |
| ServicePerformance casStats = getCasStatistics(cacheEntry.getCasReferenceId()); |
| |
| message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats |
| .getRawCasSerializationTime()); |
| message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats |
| .getRawCasDeserializationTime()); |
| message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime()); |
| long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process); |
| message.addLongProperty(AsynchAEMessage.IdleTime, iT); |
| // Send reply back to the client. Use internal (non-jms) transport |
| transport.getUimaMessageDispatcher(endpoint.getEndpoint()).dispatch(message); |
| } |
| |
| private Endpoint getReplyEndpoint(CacheEntry cacheEntry, CasStateEntry casStateEntry) |
| throws Exception { |
| Endpoint endpoint = null; |
| // Get the endpoint that represents a client that send the request |
| // to this service. If the first arg to getEndpoint() is null, the method |
| // should return the origin. |
| if (isTopLevelComponent()) { |
| if (casStateEntry.isSubordinate()) { |
| endpoint = getInProcessCache().getTopAncestorEndpoint(cacheEntry); |
| } else { |
| endpoint = getInProcessCache().getEndpoint(null, casStateEntry.getCasReferenceId()); |
| } |
| } else { |
| endpoint = getReplyEndpoint(cacheEntry); |
| dropFlow(casStateEntry.getCasReferenceId(), false); |
| } |
| return endpoint; |
| } |
| |
| private Endpoint getReplyEndpoint(CacheEntry cacheEntry) throws Exception { |
| if (cacheEntry == null) { |
| return null; |
| } |
| Endpoint endpoint = getMessageOrigin(cacheEntry.getCasReferenceId()); |
| if (endpoint == null && cacheEntry.getInputCasReferenceId() != null) { |
| // Recursively call self until an endpoint is found |
| endpoint = getReplyEndpoint(getInProcessCache().getCacheEntryForCAS( |
| cacheEntry.getInputCasReferenceId())); |
| } |
| return endpoint; |
| } |
| |
| private void executeFlowStep(FlowContainer aFlow, String aCasReferenceId, boolean newCAS) |
| throws AsynchAEException { |
| Step step = null; |
| try { |
| // Guard a call to next(). Allow one thread and block the rest |
| synchronized( flowControllerContainer ) { |
| step = aFlow.next(); |
| } |
| } catch (Exception e) { |
| // Any error here is automatic termination |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| try { |
| sendReplyWithShutdownException(aCasReferenceId); |
| |
| // getInProcessCache().destroy(); |
| ErrorContext ec = new ErrorContext(); |
| ec.add(ErrorContext.THROWABLE_ERROR, e); |
| ec.add(AsynchAEMessage.CasReference, aCasReferenceId); |
| handleAction(ErrorHandler.TERMINATE, null, ec); |
| } catch (Exception ex) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| } |
| return; |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_step__FINEST", |
| new Object[] { getComponentName(), aCasReferenceId }); |
| } |
| |
| try { |
| if (step instanceof SimpleStep) { |
| simpleStep((SimpleStep) step, aCasReferenceId); |
| } else if (step instanceof ParallelStep) { |
| parallelStep((ParallelStep) step, aCasReferenceId); |
| } else if (step instanceof FinalStep) { |
| // Special case: check if this CAS has just been produced by a Cas Multiplier. |
| // If so, we received a new CAS but there are no delegates in the pipeline. |
| // The CM was the last in the flow. In this case, set a property in the cache |
| // to simulate receipt of the reply to this CAS. This is so that the CAS is |
| // released in the finalStep() when the Aggregate is not a Cas Multiplier. |
| if (newCAS) { |
| CasStateEntry casStateEntry = localCache.lookupEntry(aCasReferenceId); |
| if (casStateEntry != null) { |
| casStateEntry.setReplyReceived(); |
| } |
| } |
| finalStep((FinalStep) step, aCasReferenceId); |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), |
| "executeFlowStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_completed_step__FINEST", |
| new Object[] { getComponentName(), aCasReferenceId }); |
| } |
| |
| } catch (Exception e) { |
| HashMap map = new HashMap(); |
| map.put(AsynchAEMessage.Command, AsynchAEMessage.Process); |
| map.put(AsynchAEMessage.CasReference, aCasReferenceId); |
| handleError(map, e); |
| } |
| } |
| |
| private void dispatch(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException { |
| if (!anEndpoint.isRemote()) { |
| try { |
| UimaTransport transport = getTransport(anEndpoint.getEndpoint()); |
| UimaMessage message = transport.produceMessage(AsynchAEMessage.Process, |
| AsynchAEMessage.Request, getName()); |
| message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId); |
| transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message); |
| |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "dispatch", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| } |
| } else { |
| // Check delegate's state before sending it a CAS. The delegate |
| // may have previously timed out and is in a process of pinging |
| // the delegate to check its availability. While the delegate |
| // is in this state, delay CASes by placing them on a list of |
| // CASes pending dispatch. Once the ping reply is received all |
| // delayed CASes will be dispatched to the delegate. |
| if (!delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpoint.getDelegateKey())) { |
| // The delegate is in the normal state so send it this CAS |
| getOutputChannel().sendRequest(aCasReferenceId, anEndpoint); |
| } |
| } |
| } |
| |
| /** |
| * Checks the state of a delegate to see if it is in TIMEOUT State. If it is, push the CAS id onto |
| * a list of CASes pending dispatch. The delegate is in a questionable state and the aggregate |
| * sends a ping message to check delegate's availability. If the delegate responds to the ping, |
| * all CASes in the pending dispatch list will be immediately dispatched. |
| **/ |
| public boolean delayCasIfDelegateInTimedOutState(String aCasReferenceId, String aDelegateKey) |
| throws AsynchAEException { |
| Delegate delegate = lookupDelegate(aDelegateKey); |
| if (delegate != null && delegate.getState() == Delegate.TIMEOUT_STATE) { |
| // Add CAS id to the list of delayed CASes. |
| int listSize = delegate.addCasToPendingDispatchList(aCasReferenceId); |
| // If the list was empty (before the add), send the GetMeta request |
| // as a PING to see if the delegate service is alive. |
| if (listSize == 1) { |
| delegate.setAwaitingPingReply(); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "delayCasIfDelegateInTimedOutState", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_aggregate_sending_ping__INFO", |
| new Object[] { getComponentName(), delegate.getKey() }); |
| } |
| retryMetadataRequest(delegate.getEndpoint()); |
| } |
| return true; |
| } |
| return false; // Cas Not Delayed |
| } |
| |
| private void dispatchProcessRequest(String aCasReferenceId, Endpoint anEndpoint, |
| boolean addEndpointToCache) throws AsynchAEException { |
| if (addEndpointToCache) { |
| getInProcessCache().addEndpoint(anEndpoint, aCasReferenceId); |
| } |
| anEndpoint.setController(this); |
| dispatch(aCasReferenceId, anEndpoint); |
| |
| } |
| |
| public void retryProcessCASRequest(String aCasReferenceId, Endpoint anEndpoint, |
| boolean addEndpointToCache) throws AsynchAEException { |
| Endpoint endpoint = null; |
| String key = lookUpDelegateKey(anEndpoint.getEndpoint()); |
| |
| if ((endpoint = getInProcessCache().getEndpoint(anEndpoint.getEndpoint(), aCasReferenceId)) != null) { |
| Endpoint masterEndpoint = lookUpEndpoint(key, true); |
| // check if the master endpoint destination has changed. This can be a case when |
| // a new temp queue is created when the previous temp queue is destroyed due to |
| // a broken connection. |
| if (masterEndpoint.getDestination() != null) { |
| // Make sure that we use the current destination for replies |
| if (!masterEndpoint.getDestination().toString() |
| .equals(endpoint.getDestination().toString())) { |
| // Override the endopoint reply-to destination with the master destination |
| endpoint.setDestination(masterEndpoint.getDestination()); |
| } |
| } |
| } else { |
| endpoint = anEndpoint; |
| endpoint = lookUpEndpoint(key, true); |
| getInProcessCache().addEndpoint(endpoint, aCasReferenceId); |
| } |
| dispatchProcessRequest(aCasReferenceId, endpoint, addEndpointToCache); |
| } |
| |
| private void dispatchProcessRequest(String aCasReferenceId, Endpoint[] anEndpointList, |
| boolean addEndpointToCache) throws AsynchAEException { |
| List<Endpoint> endpointList = new ArrayList<Endpoint>(); |
| for (int i = 0; i < anEndpointList.length; i++) { |
| // Check if the delegate previously timed out. If so, add the CAS |
| // Id to the list pending dispatch. This list holds CASes that are |
| // delayed until the service responds to a Ping. |
| if (delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpointList[i].getEndpoint())) { |
| // The CAS was delayed until the delegate responds to a Ping |
| continue; |
| } else { |
| endpointList.add(anEndpointList[i]); |
| } |
| if (addEndpointToCache) { |
| getInProcessCache().addEndpoint(anEndpointList[i], aCasReferenceId); |
| } |
| } |
| Endpoint[] endpoints = new Endpoint[endpointList.size()]; |
| endpointList.toArray(endpoints); |
| getOutputChannel().sendRequest(aCasReferenceId, endpoints); |
| } |
| |
| public boolean isDelegateKeyValid(String aDelegateKey) { |
| if (destinationMap.containsKey(aDelegateKey)) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| public String lookUpDelegateKey(String anEndpointName) { |
| return lookUpDelegateKey(anEndpointName, null); |
| } |
| |
| /** |
| * Returns a delegate key given an endpoint (queue) name and a server uri. If a server is null, |
| * only the endpoint name will be used for matching. |
| */ |
| public String lookUpDelegateKey(String anEndpointName, String server) { |
| String key = null; |
| if (destinationToKeyMap.containsKey(anEndpointName)) { |
| Set keys = destinationMap.keySet(); |
| Iterator it = keys.iterator(); |
| // Find an endpoint for the GetMeta reply. To succeed, match the endpoint (queue) name |
| // as well as the server URI. We allow endpoints managed by different servers to have |
| // the same queue name. |
| // iterate over all endpoints until a match [queue,server] is found. |
| while (it.hasNext()) { |
| key = (String) it.next(); |
| Endpoint_impl endp = (Endpoint_impl) destinationMap.get(key); |
| |
| // Check if a queue name matches |
| if (endp != null && endp.getEndpoint().equalsIgnoreCase(anEndpointName)) { |
| // Check if server match is requested as well |
| if (server != null) { |
| // server URIs must match |
| if (endp.getServerURI() != null && endp.getServerURI().equalsIgnoreCase(server)) { |
| // found a match for [queue,server] |
| break; |
| } |
| // Not found yet. Reset the key |
| key = null; |
| continue; |
| } |
| // found a match for [queue] |
| break; |
| } |
| // Not found yet. Reset the key |
| key = null; |
| } |
| } |
| |
| return key; |
| } |
| |
| public Endpoint lookUpEndpoint(String anAnalysisEngineKey, boolean clone) |
| throws AsynchAEException { |
| Endpoint endpoint = (Endpoint) destinationMap.get(anAnalysisEngineKey); |
| if (endpoint != null && clone) { |
| return (Endpoint) ((Endpoint_impl) endpoint).clone(); // (Endpoint) |
| } |
| return endpoint; |
| } |
| |
| public PrimitiveServiceInfo getDelegateServiceInfo(String aDelegateKey) { |
| if (delegateStatMap == null || aDelegateKey == null || delegateStatMap.containsKey(aDelegateKey) == false) { |
| return null; |
| } |
| Object[] delegateStats; |
| delegateStats = (Object[]) delegateStatMap.get(aDelegateKey); |
| if (delegateStats != null) { |
| return (PrimitiveServiceInfo) delegateStats[SERVICE_INFO_INDX]; |
| } |
| return null; |
| } |
| |
| public ServicePerformance getDelegateServicePerformance(String aDelegateKey) { |
| Object[] delegateStats; |
| delegateStats = (Object[]) delegateStatMap.get(aDelegateKey); |
| if (delegateStats != null) { |
| return (ServicePerformance) delegateStats[SERVICE_PERF_INDX]; |
| } |
| |
| return null; |
| } |
| |
| public ServiceErrors getDelegateServiceErrors(String aDelegateKey) { |
| Object[] delegateStats; |
| delegateStats = (Object[]) delegateStatMap.get(aDelegateKey); |
| if (delegateStats != null) { |
| return (ServiceErrors) delegateStats[SERVICE_ERROR_INDX]; |
| } |
| return null; |
| } |
| |
| public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException { |
| mergeTypeSystem(aTypeSystem, fromDestination, null); |
| } |
| |
| public synchronized void mergeTypeSystem(String aTypeSystem, String fromDestination, |
| String fromServer) throws AsynchAEException { |
| |
| try { |
| // Find the endpoint for this service, given its input queue name and broker URI. |
| // We now allow endpoints managed by different servers to have the same queue name. |
| // But if the external name of the broker is unknown (i.e. an old 2.2.2 service) |
| // then use just the queue name, i.e. queue names must be unique for 2.2.2 |
| Endpoint_impl endpoint = null; |
| String key = lookUpDelegateKey(fromDestination, fromServer); |
| if (key != null) { |
| endpoint = (Endpoint_impl) destinationMap.get(key); |
| } |
| if (endpoint == null) { |
| // Log invalid reply and move on |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_metadata_recvd_from_invalid_delegate__INFO", |
| new Object[] { getName(), fromDestination }); |
| } |
| } else if (endpoint.isWaitingForResponse()) { |
| endpoint.setWaitingForResponse(false); |
| endpoint.cancelTimer(); |
| boolean collocatedAggregate = false; |
| if ( endpoint.getServiceInfo() != null ) { |
| endpoint.getServiceInfo().setState(ServiceState.RUNNING.name()); |
| } |
| ResourceMetaData resource = null; |
| ServiceInfo remoteDelegateServiceInfo = null; |
| if (aTypeSystem.trim().length() > 0) { |
| if (endpoint.isRemote()) { |
| System.out.println("Remote Service:" + key |
| + " Initialized. Ready To Process Messages From Queue:" |
| + endpoint.getEndpoint()); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), |
| "mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_remote_delegate_ready__CONFIG", |
| new Object[] { getComponentName(), fromDestination }); |
| } |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), |
| "mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_merge_ts_from_delegate__CONFIG", new Object[] { fromDestination }); |
| } |
| ByteArrayInputStream bis = new ByteArrayInputStream(aTypeSystem.getBytes()); |
| XMLInputSource in1 = new XMLInputSource(bis, null); |
| |
| resource = UIMAFramework.getXMLParser().parseResourceMetaData(in1); |
| if (isStopped()) { |
| return; |
| } |
| getCasManagerWrapper().addMetadata((ProcessingResourceMetaData) resource); |
| analysisEngineMetaDataMap.put(key, (ProcessingResourceMetaData) resource); |
| |
| if (((ProcessingResourceMetaData) resource).getOperationalProperties() |
| .getOutputsNewCASes()) { |
| endpoint.setIsCasMultiplier(true); |
| remoteCasMultiplierList.add(key); |
| } |
| if (endpoint.isRemote()) { |
| Object o = null; |
| remoteDelegateServiceInfo = getDelegateServiceInfo(key); |
| if (remoteDelegateServiceInfo != null |
| && (o = ((ProcessingResourceMetaData) resource) |
| .getConfigurationParameterSettings().getParameterValue( |
| AnalysisEngineController.AEInstanceCount)) != null) { |
| ((PrimitiveServiceInfo) remoteDelegateServiceInfo) |
| .setAnalysisEngineInstanceCount(((Integer) o).intValue()); |
| } |
| } |
| } else { |
| collocatedAggregate = true; |
| } |
| |
| endpoint.setInitialized(true); |
| // If getMeta request not yet sent, send meta request to all remote delegate |
| // Special case when all delegates are remote is handled in the setInputChannel |
| |
| synchronized (unregisteredDelegateList) { |
| // TODO can't find where this list is checked. Is it still used??? |
| if (requestForMetaSentToRemotes == false && !allDelegatesAreRemote) { |
| String unregisteredDelegateKey = null; |
| for (int i = 0; i < unregisteredDelegateList.size(); i++) { |
| unregisteredDelegateKey = (String) unregisteredDelegateList.get(i); |
| if (unregisteredDelegateKey.equals(key)) { |
| unregisteredDelegateList.remove(i); |
| } |
| } |
| } |
| } |
| |
| // |
| if (collocatedAggregate || resource instanceof ProcessingResourceMetaData) { |
| if (allTypeSystemsMerged()) { |
| |
| for (int i = 0; i < remoteCasMultiplierList.size(); i++) { |
| Endpoint endpt = (Endpoint) destinationMap.get((String) remoteCasMultiplierList |
| .get(i)); |
| if (endpt != null && endpt.isCasMultiplier() && endpt.isRemote()) { |
| System.out.println("Setting Shadow Pool of Size:" + endpt.getShadowPoolSize() |
| + " For Cas Multiplier:" + (String) remoteCasMultiplierList.get(i)); |
| getCasManagerWrapper().initialize(endpt.getShadowPoolSize(), |
| (String) remoteCasMultiplierList.get(i)); |
| if (remoteDelegateServiceInfo != null) { |
| remoteDelegateServiceInfo.setCASMultiplier(); |
| } |
| } |
| } |
| if (!isStopped()) { |
| try { |
| completeInitialization(); |
| } catch (ResourceInitializationException ex) { |
| handleInitializationError(ex); |
| return; |
| } |
| } |
| } |
| } |
| } |
| } catch (Exception e) { |
| throw new AsynchAEException(e); |
| } |
| } |
| |
| private synchronized void completeInitialization() throws Exception { |
| if (initialized) { |
| return; |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), |
| "completeInitialization", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_all_ts_merged__CONFIG"); |
| } |
| if (errorHandlerChain == null) { |
| plugInDefaultErrorHandlerChain(); |
| } |
| AnalysisEngineDescription specifier = (AnalysisEngineDescription) super.getResourceSpecifier(); |
| aggregateMetadata = specifier.getAnalysisEngineMetaData(); |
| flowControllerContainer = UimaClassFactory.produceAggregateFlowControllerContainer(specifier, |
| flowControllerDescriptor, analysisEngineMetaDataMap, getUimaContextAdmin(), |
| ((AnalysisEngineDescription) getResourceSpecifier()).getSofaMappings(), super |
| .getManagementInterface()); |
| if (isTopLevelComponent()) { |
| // Add FC's meta |
| getCasManagerWrapper().addMetadata((ProcessingResourceMetaData)flowControllerContainer.getMetaData()); |
| // Top level component is the outer most component in the containment hierarchy. |
| getCasManagerWrapper().initialize("AggregateContext"); |
| aggregateMetadata.setTypeSystem(getCasManagerWrapper().getMetadata().getTypeSystem()); |
| aggregateMetadata.setTypePriorities(getCasManagerWrapper().getMetadata().getTypePriorities()); |
| |
| aggregateMetadata.setFsIndexCollection(getCasManagerWrapper().getMetadata() |
| .getFsIndexCollection()); |
| } |
| if (disabledDelegateList.size() > 0) { |
| flowControllerContainer.removeAnalysisEngines(disabledDelegateList); |
| } |
| // start a cleanup thread |
| ((BaseAnalysisEngineController) this).startServiceCleanupThread(30000); // sleep for 30 secs |
| |
| // Before processing CASes, send notifications to all collocated delegates to |
| // complete initialization. Currently this call forces all collocated Cas Multiplier delegates |
| // to initialize their internal Cas Pools. CM Cas Pool is lazily initialized on |
| // the first process call. The JMX Monitor needs all the Cas Pools to initialize |
| // before the process call. |
| onInitialize(); |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "completeInitialization", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_initialized_controller__INFO", new Object[] { getComponentName() }); |
| } |
| |
| // Open latch to allow messages to be processed. The |
| // latch was closed to prevent messages from entering |
| // the controller before it is initialized. |
| latch.openLatch(getName(), isTopLevelComponent(), true); |
| initialized = true; |
| // Notify client listener that the initialization of the controller was successfull |
| notifyListenersWithInitializationStatus(null); |
| // If this is a collocated aggregate change its state to RUNNING from INITIALIZING. |
| // The top level aggregate state is changed when listeners on its input queue are |
| // succesfully started in SpringContainerDeployer.doStartListeners() method. |
| if ( !isTopLevelComponent() ) { |
| changeState(ServiceState.RUNNING); |
| } |
| } |
| |
| private String findKeyForValue(String fromDestination) { |
| |
| Set set = destinationMap.entrySet(); |
| for (Iterator it = set.iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| Endpoint endpoint = (Endpoint) entry.getValue(); |
| if (endpoint != null) { |
| String value = endpoint.getEndpoint(); |
| if (value.equals(fromDestination)) { |
| return (String) entry.getKey(); |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| private boolean allTypeSystemsMerged() { |
| if (typeSystemsMerged) { |
| return true; |
| } |
| Set set = destinationMap.entrySet(); |
| int delegateCount = 0; |
| for (Iterator it = set.iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| Endpoint endpoint = (Endpoint) entry.getValue(); |
| if (endpoint != null && endpoint.getStatus() != Endpoint.DISABLED |
| && endpoint.isInitialized() == false) { |
| break; // At least one delegate has not replied to GetMeta Request |
| } |
| delegateCount++; |
| } |
| if (delegateCount == destinationMap.size()) { |
| return true; // All delegates responded to GetMeta request |
| } |
| return false; |
| } |
| |
| public void initialize() throws AsynchAEException { |
| Statistic statistic = null; |
| if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessCount)) == null) { |
| statistic = new LongNumericStatistic(Monitor.ProcessCount); |
| getMonitor().addStatistic("", statistic); |
| } |
| if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessErrorCount)) == null) { |
| statistic = new LongNumericStatistic(Monitor.ProcessErrorCount); |
| getMonitor().addStatistic("", statistic); |
| } |
| if ((statistic = getMonitor().getLongNumericStatistic("", Monitor.ProcessErrorRetryCount)) == null) { |
| statistic = new LongNumericStatistic(Monitor.ProcessErrorRetryCount); |
| getMonitor().addStatistic("", statistic); |
| } |
| } |
| |
| public void dispatchMetadataRequest(Endpoint anEndpoint) throws AsynchAEException { |
| if (isStopped()) { |
| return; |
| } |
| if (anEndpoint == null) { |
| throw new AsynchAEException("Controller:" + getComponentName() |
| + " Unable To Dispatch GetMeta Request. Provided Endpoint is Invalid (NULL)"); |
| } |
| anEndpoint.startMetadataRequestTimer(); |
| anEndpoint.setController(this); |
| anEndpoint.setWaitingForResponse(true); |
| String key = lookUpDelegateKey(anEndpoint.getEndpoint()); |
| if (key != null && !delegateStatMap.containsKey(key)) { |
| if (key != null) { |
| ServiceInfo serviceInfo = anEndpoint.getServiceInfo(); |
| PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo(serviceInfo.isCASMultiplier(),null); |
| pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL()); |
| pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName()); |
| pServiceInfo.setState(serviceInfo.getState()); |
| pServiceInfo.setAnalysisEngineInstanceCount(1); |
| ServicePerformance servicePerformance = new ServicePerformance(); |
| if (anEndpoint.isRemote()) { |
| servicePerformance.setRemoteDelegate(); |
| } |
| ServiceErrors serviceErrors = new ServiceErrors(); |
| |
| serviceErrorMap.put(key, serviceErrors); |
| Object[] delegateStatsArray = new Object[] { pServiceInfo, servicePerformance, |
| serviceErrors }; |
| |
| delegateStatMap.put(key, delegateStatsArray); |
| |
| } |
| } |
| getOutputChannel().sendRequest(AsynchAEMessage.GetMeta, anEndpoint); |
| } |
| |
| public void retryMetadataRequest(Endpoint anEndpoint) throws AsynchAEException { |
| dispatchMetadataRequest(anEndpoint); |
| } |
| |
| public void sendMetadata(Endpoint anEndpoint) { |
| super.sendMetadata(anEndpoint, aggregateMetadata); |
| } |
| |
| public ControllerLatch getControllerLatch() { |
| return latch; |
| } |
| |
| public Monitor getMonitor() { |
| return super.monitor; |
| } |
| |
| public void setMonitor(Monitor monitor) { |
| this.monitor = monitor; |
| } |
| |
| public void handleDelegateLifeCycleEvent(String anEndpoint, int aDelegateCount) { |
| Endpoint endpoint = null; |
| if (destinationMap.containsKey(anEndpoint)) { |
| endpoint = (Endpoint) destinationMap.get(anEndpoint); |
| } |
| if (aDelegateCount == 0) { |
| |
| // Set the state of the delegate endpoint |
| endpoint.setNoConsumers(true); |
| // Get entries from the in-process cache that match the endpoint name |
| CacheEntry[] cachedEntries = getInProcessCache().getCacheEntriesForEndpoint(anEndpoint); |
| |
| for (int i = 0; cachedEntries != null && i < cachedEntries.length; i++) { |
| String casReferenceId = cachedEntries[i].getCasReferenceId(); |
| String parentCasReferenceId = null; |
| try { |
| CasStateEntry parentEntry = getLocalCache().getTopCasAncestor(casReferenceId);// cachedEntries[i].getInputCasReferenceId(); |
| if (parentEntry != null) { |
| parentCasReferenceId = parentEntry.getCasReferenceId(); |
| } |
| } catch (Exception e) { |
| System.out.println("Controller:" + getComponentName() + " Parent CAS For CAS:" |
| + casReferenceId + " Not Found In Cache"); |
| } |
| getInProcessCache().getEndpoint(anEndpoint, casReferenceId).cancelTimer(); |
| Endpoint requestOrigin = cachedEntries[i].getMessageOrigin(); |
| try { |
| getOutputChannel().sendReply( |
| new UimaEEServiceException("Delegates Not Found To Process CAS on Endpoint:" |
| + anEndpoint), casReferenceId, parentCasReferenceId, requestOrigin, |
| AsynchAEMessage.Process); |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "handleDelegateLifeCycleEvent", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_no_consumers__INFO", new Object[] { casReferenceId, anEndpoint }); |
| } |
| } finally { |
| getInProcessCache().remove(casReferenceId); |
| } |
| } |
| } else if (endpoint != null && endpoint.hasNoConsumers()) { |
| // At least one consumer is available |
| endpoint.setNoConsumers(false); |
| } |
| |
| } |
| |
| public void retryLastCommand(int aCommand, Endpoint anEndpoint, String aCasReferenceId) { |
| try { |
| if (AsynchAEMessage.Process == aCommand) { |
| getOutputChannel().sendRequest(aCasReferenceId, anEndpoint); |
| } else { |
| getOutputChannel().sendRequest(aCommand, anEndpoint); |
| } |
| } catch (AsynchAEException e) { |
| |
| } |
| } |
| |
| public ServiceErrors getServiceErrors(String aDelegateKey) { |
| if (!serviceErrorMap.containsKey(aDelegateKey)) { |
| return null; |
| } |
| return (ServiceErrors) serviceErrorMap.get(aDelegateKey); |
| } |
| |
| public AggregateServiceInfo getServiceInfo() { |
| if (serviceInfo == null) { |
| serviceInfo = new AggregateServiceInfo(isCasMultiplier(), this); |
| // if this is a top level service and the input channel not yet initialized |
| // block in getInputChannel() on the latch |
| if (isTopLevelComponent() && getInputChannel() != null) { |
| serviceInfo.setInputQueueName(getInputChannel().getName()); |
| serviceInfo.setBrokerURL(super.getBrokerURL()); |
| } else { |
| serviceInfo.setInputQueueName(getName()); |
| serviceInfo.setBrokerURL("vm://localhost"); |
| } |
| serviceInfo.setDeploymentDescriptorPath(super.aeDescriptor); |
| //serviceInfo.setState(super.getState().name()); |
| } |
| return serviceInfo; |
| } |
| |
| public ServicePerformance getServicePerformance(String aDelegateKey) { |
| return getDelegateServicePerformance(aDelegateKey); |
| } |
| |
| /** |
| * Accumulate analysis time for the aggregate |
| * |
| * @param anAnalysisTime |
| */ |
| public synchronized void incrementAnalysisTime(long anAnalysisTime) { |
| servicePerformance.incrementAnalysisTime(anAnalysisTime); |
| } |
| |
| public void stopTimers() { |
| Set set = destinationMap.entrySet(); |
| for (Iterator it = set.iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| Endpoint endpoint = (Endpoint) entry.getValue(); |
| if (endpoint != null) { |
| endpoint.cancelTimer(); |
| } |
| } |
| |
| } |
| |
| public boolean requestForMetaSentToRemotes() { |
| return requestForMetaSentToRemotes; |
| } |
| |
| public void setRequestForMetaSentToRemotes() { |
| requestForMetaSentToRemotes = true; |
| } |
| |
| public void cleanUp() { |
| if (flowMap != null) { |
| flowMap.clear(); |
| } |
| if (destinationMap != null) { |
| destinationMap.clear(); |
| } |
| |
| if (destinationToKeyMap != null) { |
| destinationToKeyMap.clear(); |
| } |
| if (analysisEngineMetaDataMap != null) { |
| analysisEngineMetaDataMap.clear(); |
| } |
| if (remoteCasMultiplierList != null) { |
| remoteCasMultiplierList.clear(); |
| } |
| if (originMap != null) { |
| originMap.clear(); |
| } |
| |
| if (childControllerList != null) { |
| synchronized( childControllerList ) { |
| childControllerList.clear(); |
| } |
| } |
| if (delegateStats != null) { |
| delegateStats.clear(); |
| } |
| if (flowControllerContainer != null) { |
| synchronized (flowControllerContainer) { |
| flowControllerContainer.destroy(); |
| } |
| } |
| perCasStatistics.clear(); |
| |
| if (disabledDelegateList != null) { |
| disabledDelegateList.clear(); |
| } |
| |
| if (delegateStatMap != null) { |
| delegateStatMap.clear(); |
| } |
| |
| } |
| |
| public void stop() { |
| super.stop(); |
| this.cleanUp(); |
| } |
| |
| public List getChildControllerList() { |
| return childControllerList; |
| } |
| |
| /** |
| * Force all collocated delegates to perform any post-initialization steps. |
| */ |
| public void onInitialize() { |
| // For each collocated delegate |
| synchronized(childControllerList) { |
| if ( childControllerList.size() > 0 ) { |
| for( AnalysisEngineController childController : childControllerList ) { |
| // notify the delegate |
| childController.onInitialize(); |
| } |
| } |
| } |
| } |
| |
| public LocalCache getLocalCache() { |
| return localCache; |
| } |
| |
| /** |
| * Return {@link Delegate} object for a given delegate key. |
| * |
| */ |
| public Delegate lookupDelegate(String aDelegateKey) { |
| |
| for (Delegate delegate : delegates) { |
| if (delegate.getKey().equals(aDelegateKey)) { |
| return delegate; |
| } |
| } |
| return null; |
| } |
| |
| } |