blob: e11c4236bf13c1c88165332ffe2cf29b80bc4784 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.aae.controller;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.AsynchAECasManager;
import org.apache.uima.aae.InProcessCache;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.jmx.JmxManagement;
import org.apache.uima.aae.jmx.PrimitiveServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.spi.transport.UimaMessage;
import org.apache.uima.aae.spi.transport.UimaTransport;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.analysis_engine.CasIterator;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.cas.impl.OutOfTypeSystemData;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resource.metadata.ConfigurationParameter;
import org.apache.uima.resource.metadata.impl.ConfigurationParameter_impl;
import org.apache.uima.util.InvalidXMLException;
import org.apache.uima.util.Level;
public class PrimitiveAnalysisEngineController_impl extends BaseAnalysisEngineController implements
PrimitiveAnalysisEngineController {
private static final Class CLASS_NAME = PrimitiveAnalysisEngineController_impl.class;
// Stores AE metadata
private AnalysisEngineMetaData analysisEngineMetadata;
// Number of AE instances
private int analysisEnginePoolSize;
// Mutex
protected Object notifyObj = new Object();
// Temp list holding instances of AE
private List aeList = new ArrayList();
// Stores service info for JMX
private PrimitiveServiceInfo serviceInfo = null;
// Pool containing instances of AE. The default implementation provides Thread affinity
// meaning each thread executes the same AE instance.
private AnalysisEngineInstancePool aeInstancePool = null;
private String abortedCASReferenceId = null;
// Create a shared semaphore to serialize creation of AE instances.
// There is a single instance of this semaphore per JVM and it
// guards uima core code that is not thread safe.
private static Semaphore sharedInitSemaphore = new Semaphore(1);
public PrimitiveAnalysisEngineController_impl(String anEndpointName,
String anAnalysisEngineDescriptor, AsynchAECasManager aCasManager,
InProcessCache anInProcessCache, int aWorkQueueSize, int anAnalysisEnginePoolSize)
throws Exception {
this(null, anEndpointName, anAnalysisEngineDescriptor, aCasManager, anInProcessCache,
aWorkQueueSize, anAnalysisEnginePoolSize, 0);
}
public PrimitiveAnalysisEngineController_impl(AnalysisEngineController aParentController,
String anEndpointName, String anAnalysisEngineDescriptor, AsynchAECasManager aCasManager,
InProcessCache anInProcessCache, int aWorkQueueSize, int anAnalysisEnginePoolSize)
throws Exception {
this(aParentController, anEndpointName, anAnalysisEngineDescriptor, aCasManager,
anInProcessCache, aWorkQueueSize, anAnalysisEnginePoolSize, 0);
}
public PrimitiveAnalysisEngineController_impl(AnalysisEngineController aParentController,
String anEndpointName, String anAnalysisEngineDescriptor, AsynchAECasManager aCasManager,
InProcessCache anInProcessCache, int aWorkQueueSize, int anAnalysisEnginePoolSize,
int aComponentCasPoolSize) throws Exception {
this(aParentController, anEndpointName, anAnalysisEngineDescriptor, aCasManager,
anInProcessCache, aWorkQueueSize, anAnalysisEnginePoolSize, aComponentCasPoolSize, null);
}
public PrimitiveAnalysisEngineController_impl(AnalysisEngineController aParentController,
String anEndpointName, String anAnalysisEngineDescriptor, AsynchAECasManager aCasManager,
InProcessCache anInProcessCache, int aWorkQueueSize, int anAnalysisEnginePoolSize,
int aComponentCasPoolSize, long anInitialCasHeapSize) throws Exception {
this(aParentController, anEndpointName, anAnalysisEngineDescriptor, aCasManager,
anInProcessCache, aWorkQueueSize, anAnalysisEnginePoolSize, aComponentCasPoolSize,
anInitialCasHeapSize, null);
}
public PrimitiveAnalysisEngineController_impl(AnalysisEngineController aParentController,
String anEndpointName, String anAnalysisEngineDescriptor, AsynchAECasManager aCasManager,
InProcessCache anInProcessCache, int aWorkQueueSize, int anAnalysisEnginePoolSize,
int aComponentCasPoolSize, JmxManagement aJmxManagement) throws Exception {
this(aParentController, anEndpointName, anAnalysisEngineDescriptor, aCasManager,
anInProcessCache, aWorkQueueSize, anAnalysisEnginePoolSize, aComponentCasPoolSize, 0,
aJmxManagement);
}
public PrimitiveAnalysisEngineController_impl(AnalysisEngineController aParentController,
String anEndpointName, String anAnalysisEngineDescriptor, AsynchAECasManager aCasManager,
InProcessCache anInProcessCache, int aWorkQueueSize, int anAnalysisEnginePoolSize,
int aComponentCasPoolSize, long anInitialCasHeapSize, JmxManagement aJmxManagement)
throws Exception {
super(aParentController, aComponentCasPoolSize, anInitialCasHeapSize, anEndpointName,
anAnalysisEngineDescriptor, aCasManager, anInProcessCache, null, aJmxManagement);
analysisEnginePoolSize = anAnalysisEnginePoolSize;
}
public PrimitiveAnalysisEngineController_impl(AnalysisEngineController aParentController,
String anEndpointName, String anAnalysisEngineDescriptor, AsynchAECasManager aCasManager,
InProcessCache anInProcessCache, int aWorkQueueSize, int anAnalysisEnginePoolSize,
JmxManagement aJmxManagement) throws Exception {
this(aParentController, anEndpointName, anAnalysisEngineDescriptor, aCasManager,
anInProcessCache, aWorkQueueSize, anAnalysisEnginePoolSize, 0, aJmxManagement);
}
public int getAEInstanceCount() {
return analysisEnginePoolSize;
}
public void initializeAnalysisEngine() throws ResourceInitializationException {
ResourceSpecifier rSpecifier = null;
try {
// Acquire single-permit semaphore to serialize instantiation of AEs.
// This is done to control access to non-thread safe structures in the
// core. The sharedInitSemaphore is a static and is shared by all instances
// of this class.
sharedInitSemaphore.acquire();
// Parse the descriptor in the calling thread.
rSpecifier = UimaClassFactory.produceResourceSpecifier(super.aeDescriptor);
AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, paramsMap);
if (aeInstancePool == null) {
aeInstancePool = new AnalysisEngineInstancePoolWithThreadAffinity(analysisEnginePoolSize);
}
if (analysisEngineMetadata == null) {
analysisEngineMetadata = ae.getAnalysisEngineMetaData();
}
// Check if OperationalProperties allow replication of this AE. Throw exception if
// the deployment descriptor says to scale the service *but* the AE descriptor's
// OperationalProperties disallow it.
if ( !analysisEngineMetadata.getOperationalProperties().isMultipleDeploymentAllowed() &&
aeInstancePool.size() >= 1 ) {
throw new ResourceInitializationException( UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_multiple_deployment_not_allowed__WARNING", new Object[] {this.getComponentName(), ae.getMetaData().getName()});
}
aeInstancePool.checkin(ae);
if (aeInstancePool.size() == analysisEnginePoolSize) {
try {
postInitialize();
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"initializeAnalysisEngine", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
throw new ResourceInitializationException(e);
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"initializeAnalysisEngine", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
super.notifyListenersWithInitializationStatus(e);
if (isTopLevelComponent()) {
super.notifyListenersWithInitializationStatus(e);
} else {
// get the top level controller to notify
AnalysisEngineController controller = this.getParentController();
while (!controller.isTopLevelComponent()) {
controller = controller.getParentController();
}
getParentController().notifyListenersWithInitializationStatus(e);
}
throw new ResourceInitializationException(e);
} finally {
// Release the shared semaphore so that another instance of this class can instantiate
// an Analysis Engine.
sharedInitSemaphore.release();
}
}
public boolean threadAssignedToAE() {
if (aeInstancePool == null) {
return false;
}
return aeInstancePool.exists();
}
public void initialize() throws AsynchAEException {
}
/**
* This method is called after all AE instances initialize. It is called once. It initializes
* service Cas Pool, notifies the deployer that initialization completed and finally loweres a
* semaphore allowing messages to be processed.
*
* @throws AsynchAEException
*/
private void postInitialize() throws AsynchAEException {
try {
if (errorHandlerChain == null) {
super.plugInDefaultErrorHandlerChain();
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, getClass().getName(), "initialize",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_primitive_ctrl_init_info__CONFIG", new Object[] { analysisEnginePoolSize });
}
if (serviceInfo == null) {
serviceInfo = new PrimitiveServiceInfo(isCasMultiplier(), this);
}
serviceInfo.setServiceKey(delegateKey);
serviceInfo.setAnalysisEngineInstanceCount(analysisEnginePoolSize);
if (!isStopped()) {
getMonitor().setThresholds(getErrorHandlerChain().getThresholds());
// Initialize Cas Manager
if (getCasManagerWrapper() != null) {
try {
if (getCasManagerWrapper().isInitialized()) {
getCasManagerWrapper().addMetadata(getAnalysisEngineMetadata());
if (isTopLevelComponent()) {
getCasManagerWrapper().initialize("PrimitiveAEService");
CAS cas = getCasManagerWrapper().getNewCas("PrimitiveAEService");
cas.release();
}
}
if (isTopLevelComponent()) {
super.notifyListenersWithInitializationStatus(null);
}
// All internal components of this Primitive have been initialized. Open the latch
// so that this service can start processing requests.
latch.openLatch(getName(), isTopLevelComponent(), true);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"postInitialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
throw new AsynchAEException(e);
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, getClass().getName(),
"postInitialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_manager_wrapper_notdefined__CONFIG", new Object[] {});
}
}
}
((BaseAnalysisEngineController) this).startServiceCleanupThread(30000); // sleep for 30 secs
} catch (AsynchAEException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"postInitialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
throw e;
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"postInitialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
throw new AsynchAEException(e);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "postInitialize",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_initialized_controller__INFO",
new Object[] { getComponentName() });
}
super.serviceInitialized = true;
}
/**
* Forces initialization of a Cas Pool if this is a Cas Multiplier delegate collocated with an
* aggregate. The parent aggregate calls this method when all type systems have been merged.
*/
public synchronized void onInitialize() {
// Component's Cas Pool is registered lazily, when the process() is called for
// the first time. For monitoring purposes, we need the comoponent's Cas Pool
// MBeans to register during initialization of the service. For a Cas Multiplier
// force creation of the Cas Pool and registration of a Cas Pool with the JMX Server.
// Just get the CAS and release it back to the component's Cas Pool.
if (isCasMultiplier() && !isTopLevelComponent()) {
CAS cas = (CAS) getUimaContext().getEmptyCas(CAS.class);
cas.release();
}
}
/**
*
*/
public void collectionProcessComplete(Endpoint anEndpoint)// throws AsynchAEException
{
AnalysisEngine ae = null;
long start = super.getCpuTime();
localCache.dumpContents();
try {
ae = aeInstancePool.checkout();
if (ae != null) {
ae.collectionProcessComplete();
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(),
"collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cpc_all_cases_processed__FINEST", new Object[] { getComponentName() });
}
getServicePerformance().incrementAnalysisTime(super.getCpuTime() - start);
if (!anEndpoint.isRemote()) {
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.CollectionProcessComplete,
AsynchAEMessage.Response, getName());
// Send CPC completion reply back to the client. Use internal (non-jms) transport
transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
} else {
getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(),
"collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cpc_completed__FINE", new Object[] { getComponentName() });
}
} catch (Exception e) {
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);
getErrorHandlerChain().handle(e, errorContext, this);
} finally {
clearStats();
if (ae != null) {
try {
aeInstancePool.checkin(ae);
} catch (Exception ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_unable_to_check_ae_back_to_pool__WARNING",
new Object[] { getComponentName(), ex });
}
}
}
}
}
/**
* This is called when a Stop request is received from a client. Add the provided Cas id to the
* list of aborted CASes. The process() method checks this list to determine if it should continue
* generating children.
*
* @param aCasReferenceId
* - Id of an input CAS. The client wants to stop generation of child CASes from this
* CAS.
*
* @return
*/
public void process(CAS aCAS, String aCasReferenceId, Endpoint anEndpoint) {
if (stopped) {
return;
}
CasStateEntry parentCasStateEntry = null;
try {
parentCasStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
return;
}
long totalProcessTime = 0; // stored total time spent producing ALL CASes
boolean inputCASReturned = false;
boolean processingFailed = false;
// This is a primitive controller. No more processing is to be done on the Cas. Mark the
// destination as final and return CAS in reply.
anEndpoint.setFinal(true);
AnalysisEngine ae = null;
try {
// Checkout an instance of AE from the pool
ae = aeInstancePool.checkout();
// Get input CAS entry from the InProcess cache
long time = super.getCpuTime();
CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
// Store how long it took to call processAndOutputNewCASes()
totalProcessTime = (super.getCpuTime() - time);
long sequence = 1;
long hasNextTime = 0; // stores time in hasNext()
long getNextTime = 0; // stores time in next();
boolean moreCASesToProcess = true;
boolean casAbortedDueToExternalRequest = false;
while (moreCASesToProcess) {
long timeToProcessCAS = 0; // stores time in hasNext() and next() for each CAS
hasNextTime = super.getCpuTime();
if (!casIterator.hasNext()) {
moreCASesToProcess = false;
// Measure how long it took to call hasNext()
timeToProcessCAS = (super.getCpuTime() - hasNextTime);
totalProcessTime += timeToProcessCAS;
break; // from while
}
// Measure how long it took to call hasNext()
timeToProcessCAS = (super.getCpuTime() - hasNextTime);
getNextTime = super.getCpuTime();
CAS casProduced = casIterator.next();
// Add how long it took to call next()
timeToProcessCAS += (super.getCpuTime() - getNextTime);
// Add time to call hasNext() and next() to the running total
totalProcessTime += timeToProcessCAS;
casAbortedDueToExternalRequest = abortGeneratingCASes(aCasReferenceId);
// If the service is stopped or aborted, stop generating new CASes and just return the input
// CAS
if (stopped || casAbortedDueToExternalRequest) {
if (getInProcessCache() != null && getInProcessCache().getSize() > 0
&& getInProcessCache().entryExists(aCasReferenceId)) {
try {
// Set a flag on the input CAS to indicate that the processing was aborted
getInProcessCache().getCacheEntryForCAS(aCasReferenceId).setAborted(true);
} catch (Exception e) {
// An exception be be thrown here if the service is being stopped.
// The top level controller may have already cleaned up the cache
// and the getCacheEntryForCAS() will throw an exception. Ignore it
// here, we are shutting down.
} finally {
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// We are terminating the iterator here, release the internal CAS lock
// so that we can release the CAS. This approach may need to be changed
// as there may potentially be a problem with a Class Loader.
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
((CASImpl) aCAS).enableReset(true);
try {
// We are either stopping the service or aborting input CAS due to explicit STOP
// request
// from a client. If a new CAS was produced, release it back to the pool.
if (casProduced != null) {
casProduced.release();
}
} catch (Exception e) {
System.out.println("Controller:" + getComponentName()
+ " Attempt to release CAS Failed");
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
getClass().getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_stopped_producing_new_cases__INFO",
new Object[] { Thread.currentThread().getId(), getComponentName(),
aCasReferenceId });
}
System.out.println(">>>> Cas Multiplier:" + getComponentName()
+ " Stopped Generating CASes from Input CAS:" + aCasReferenceId);
}
}
if (casAbortedDueToExternalRequest) {
// The controller was told to stop generating new CASes. Just return the input CAS to
// the
// client
// throw new ResourceProcessException(new
// InterruptedException("Cas Multiplier:"+getComponentName()+" Aborted CAS:"+aCasReferenceId));
break; // break out of the cas producing loop and return an input CAS to the client
} else {
// The controller is stopping
return;
}
}
OutOfTypeSystemData otsd = getInProcessCache().getOutOfTypeSystemData(aCasReferenceId);
MessageContext mContext = getInProcessCache()
.getMessageAccessorByReference(aCasReferenceId);
CacheEntry newEntry = getInProcessCache().register(casProduced, mContext, otsd);
// if this Cas Multiplier is not Top Level service, add new Cas Id to the private
// cache of the parent aggregate controller. The Aggregate needs to know about
// all CASes it has in play that were generated from the input CAS.
CasStateEntry childCasStateEntry = null;
if (!isTopLevelComponent()) {
newEntry.setNewCas(true, parentController.getComponentName());
// Create CAS state entry in the aggregate's local cache
childCasStateEntry = parentController.getLocalCache().createCasStateEntry(
newEntry.getCasReferenceId());
// Fetch the parent CAS state entry from the aggregate's local cache. We need to increment
// number of child CASes associated with it.
parentCasStateEntry = parentController.getLocalCache().lookupEntry(aCasReferenceId);
} else {
childCasStateEntry = getLocalCache().createCasStateEntry(newEntry.getCasReferenceId());
}
// Associate parent CAS (input CAS) with the new CAS.
childCasStateEntry.setInputCasReferenceId(aCasReferenceId);
// Increment number of child CASes generated from the input CAS
parentCasStateEntry.incrementSubordinateCasInPlayCount();
// Associate input CAS with the new CAS
newEntry.setInputCasReferenceId(aCasReferenceId);
newEntry.setCasSequence(sequence);
// Add to the cache how long it took to process the generated (subordinate) CAS
getCasStatistics(newEntry.getCasReferenceId()).incrementAnalysisTime(timeToProcessCAS);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
getClass().getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_produced_new_cas__FINE",
new Object[] { Thread.currentThread().getName(),
getUimaContextAdmin().getQualifiedContextName(),
newEntry.getCasReferenceId(), aCasReferenceId });
}
// Add the generated CAS to the outstanding CAS Map. Client notification will release
// this CAS back to its pool
synchronized (syncObject) {
if (isTopLevelComponent()) {
// Add the id of the generated CAS to the map holding outstanding CASes. This
// map will be referenced when a client sends Free CAS Notification. The map
// stores the id of the CAS both as a key and a value. Map is used to facilitate
// quick lookup
cmOutstandingCASes.put(newEntry.getCasReferenceId(), newEntry.getCasReferenceId());
}
// Increment number of CASes processed by this service
sequence++;
}
if (!anEndpoint.isRemote()) {
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
AsynchAEMessage.Request, getName());
message.addStringProperty(AsynchAEMessage.CasReference, newEntry.getCasReferenceId());
message.addStringProperty(AsynchAEMessage.InputCasReference, aCasReferenceId);
message.addLongProperty(AsynchAEMessage.CasSequence, sequence);
ServicePerformance casStats = getCasStatistics(aCasReferenceId);
message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
.getRawCasSerializationTime());
message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
.getRawCasDeserializationTime());
message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
message.addLongProperty(AsynchAEMessage.IdleTime, iT);
if (!stopped) {
transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
}
} else {
// Send generated CAS to the client
if (!stopped) {
getOutputChannel().sendReply(newEntry, anEndpoint);
}
}
// Remove the new CAS state entry from the local cache if this a top level primitive.
// If not top level, the client (an Aggregate) will remove this entry when this new
// generated CAS reaches Final State.
if (isTopLevelComponent()) {
try {
localCache.lookupEntry(newEntry.getCasReferenceId()).setDropped(true);
} catch (Exception e) {
}
localCache.remove(newEntry.getCasReferenceId());
}
// Remove Stats from the global Map associated with the new CAS
// These stats for this CAS were added to the response message
// and are no longer needed
dropCasStatistics(newEntry.getCasReferenceId());
} // while
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
getClass().getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_completed_analysis__FINEST",
new Object[] { Thread.currentThread().getName(), getComponentName(),
aCasReferenceId, (double) (super.getCpuTime() - time) / (double) 1000000 });
}
getMonitor().resetCountingStatistic("", Monitor.ProcessErrorCount);
// Set total number of children generated from this CAS
// Store total time spent processing this input CAS
getCasStatistics(aCasReferenceId).incrementAnalysisTime(totalProcessTime);
if (!anEndpoint.isRemote()) {
inputCASReturned = true;
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
AsynchAEMessage.Response, getName());
message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
ServicePerformance casStats = getCasStatistics(aCasReferenceId);
message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
.getRawCasSerializationTime());
message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
.getRawCasDeserializationTime());
message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
message.addLongProperty(AsynchAEMessage.IdleTime, iT);
// Send reply back to the client. Use internal (non-jms) transport
if (!stopped) {
transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
}
} else {
if (!stopped) {
getOutputChannel().sendReply(aCasReferenceId, anEndpoint);
}
inputCASReturned = true;
}
// Remove input CAS state entry from the local cache
if (!isTopLevelComponent()) {
localCache.lookupEntry(aCasReferenceId).setDropped(true);
localCache.remove(aCasReferenceId);
}
} catch (Throwable e) {
processingFailed = true;
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);
// Handle the exception. Pass reference to the PrimitiveController instance
getErrorHandlerChain().handle(e, errorContext, this);
} finally {
dropCasStatistics(aCasReferenceId);
if (ae != null) {
try {
aeInstancePool.checkin(ae);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
}
// drop the CAS if it has been successfully processed. If there was a failure, the Error
// Handler
// will drop the CAS
if (isTopLevelComponent() && !processingFailed) {
// Release CASes produced from the input CAS if the input CAS has been aborted
if (abortGeneratingCASes(aCasReferenceId)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_remove_cache_entry__INFO",
new Object[] { getComponentName(), aCasReferenceId });
}
getInProcessCache().releaseCASesProducedFromInputCAS(aCasReferenceId);
} else if (inputCASReturned && isTopLevelComponent()) {
// Remove input CAS cache entry if the CAS has been sent to the client
dropCAS(aCasReferenceId, true);
localCache.dumpContents();
}
}
}
}
private void addConfigIntParameter(String aParamName, int aParamValue) {
ConfigurationParameter cp = new ConfigurationParameter_impl();
cp.setMandatory(false);
cp.setMultiValued(false);
cp.setName(aParamName);
cp.setType("Integer");
getAnalysisEngineMetadata().getConfigurationParameterDeclarations().addConfigurationParameter(
cp);
getAnalysisEngineMetadata().getConfigurationParameterSettings().setParameterValue(aParamName,
aParamValue);
}
// Return metadata
public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException {
addConfigIntParameter(AnalysisEngineController.AEInstanceCount, analysisEnginePoolSize);
if (getAnalysisEngineMetadata().getOperationalProperties().getOutputsNewCASes()) {
addConfigIntParameter(AnalysisEngineController.CasPoolSize, super.componentCasPoolSize);
}
super.sendMetadata(anEndpoint, getAnalysisEngineMetadata());
}
private AnalysisEngineMetaData getAnalysisEngineMetadata() {
return analysisEngineMetadata;
}
/**
* Executes action on error. Primitive Controller allows two types of actions TERMINATE and
* DROPCAS.
*/
public void takeAction(String anAction, String anEndpointName, ErrorContext anErrorContext) {
try {
if (ErrorHandler.TERMINATE.equalsIgnoreCase(anAction)
|| ErrorHandler.DROPCAS.equalsIgnoreCase(anAction)) {
super.handleAction(anAction, anEndpointName, anErrorContext);
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"takeAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
}
public String getServiceEndpointName() {
return getName();
}
public synchronized ControllerLatch getControllerLatch() {
return latch;
}
public boolean isPrimitive() {
return true;
}
public Monitor getMonitor() {
return super.monitor;
}
public void setMonitor(Monitor monitor) {
this.monitor = monitor;
}
public void handleDelegateLifeCycleEvent(String anEndpoint, int aDelegateCount) {
if (aDelegateCount == 0) {
// tbi
}
}
protected String getNameFromMetadata() {
return super.getMetaData().getName();
}
public void setAnalysisEngineInstancePool(AnalysisEngineInstancePool aPool) {
aeInstancePool = aPool;
}
public PrimitiveServiceInfo getServiceInfo() {
if (serviceInfo == null) {
serviceInfo = new PrimitiveServiceInfo(isCasMultiplier(), this);
serviceInfo.setServiceKey(delegateKey);
}
if (isTopLevelComponent() && getInputChannel() != null) {
serviceInfo.setInputQueueName(getInputChannel().getServiceInfo().getInputQueueName());
serviceInfo.setBrokerURL(super.getBrokerURL());
serviceInfo.setDeploymentDescriptorPath(super.aeDescriptor);
}
return serviceInfo;
}
public void stop() {
super.stop();
if (aeInstancePool != null) {
try {
aeInstancePool.destroy();
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"stop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
}
try {
for (Entry<String, UimaTransport> transport : transports.entrySet()) {
transport.getValue().stopIt();
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"stop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", new Object[] { e });
}
}
if (cmOutstandingCASes != null) {
if (!cmOutstandingCASes.isEmpty()) {
// If there are outstanding CASes, force them to be released
// If the CM is blocking on getCAS() this will unblock it and
// enable termination. Otherwise, a hang may occur
Iterator<String> it = cmOutstandingCASes.keySet().iterator();
while (it.hasNext()) {
String casId = it.next();
try {
CacheEntry entry = getInProcessCache().getCacheEntryForCAS(casId);
if (entry != null && entry.getCas() != null) {
System.out.println("Primitive:" + getComponentName() + " Forcing Release of CAS:"
+ casId + " in stop()");
// Force CAS release to unblock CM thread
entry.getCas().release();
}
} catch (Exception e) {
System.out.println("Controller:" + getComponentName() + " CAS:" + casId
+ " Not Found In Cache");
}
}
}
cmOutstandingCASes.clear();
}
if (aeList != null) {
aeList.clear();
aeList = null;
}
System.out.println("Service:" + getComponentName() + " Stopped");
}
}