blob: 419729010415edb7d4f190a0305f66d0e890a55b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.uima.collection.impl.cpm.engine;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.uima.UIMAFramework;
import org.apache.uima.adapter.vinci.util.Descriptor;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.analysis_engine.impl.AnalysisEngineImplBase;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.collection.CasConsumer;
import org.apache.uima.collection.CasInitializer;
import org.apache.uima.collection.CollectionReader;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.collection.StatusCallbackListener;
import org.apache.uima.collection.base_cpm.AbortCPMException;
import org.apache.uima.collection.base_cpm.BaseCollectionReader;
import org.apache.uima.collection.base_cpm.BaseStatusCallbackListener;
import org.apache.uima.collection.base_cpm.CasDataCollectionReader;
import org.apache.uima.collection.base_cpm.CasDataConsumer;
import org.apache.uima.collection.base_cpm.CasDataInitializer;
import org.apache.uima.collection.base_cpm.CasObjectProcessor;
import org.apache.uima.collection.base_cpm.CasProcessor;
import org.apache.uima.collection.base_cpm.RecoverableCollectionReader;
import org.apache.uima.collection.base_cpm.SkipCasException;
import org.apache.uima.collection.impl.EntityProcessStatusImpl;
import org.apache.uima.collection.impl.base_cpm.container.CasProcessorConfiguration;
import org.apache.uima.collection.impl.base_cpm.container.ProcessingContainer;
import org.apache.uima.collection.impl.base_cpm.container.deployer.CasProcessorDeployer;
import org.apache.uima.collection.impl.base_cpm.container.deployer.CasProcessorDeploymentException;
import org.apache.uima.collection.impl.cpm.CheckpointData;
import org.apache.uima.collection.impl.cpm.Constants;
import org.apache.uima.collection.impl.cpm.container.CPEFactory;
import org.apache.uima.collection.impl.cpm.container.deployer.DeployFactory;
import org.apache.uima.collection.impl.cpm.container.deployer.socket.ProcessControllerAdapter;
import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
import org.apache.uima.collection.impl.cpm.utils.ChunkMetadata;
import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage;
import org.apache.uima.collection.impl.cpm.utils.TimerFactory;
import org.apache.uima.collection.metadata.CpeCasProcessor;
import org.apache.uima.collection.metadata.CpeCasProcessors;
import org.apache.uima.collection.metadata.CpeConfiguration;
import org.apache.uima.collection.metadata.CpeDescription;
import org.apache.uima.internal.util.JavaTimer;
import org.apache.uima.resource.CasManager;
import org.apache.uima.resource.ResourceConfigurationException;
import org.apache.uima.resource.ResourceCreationSpecifier;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resource.metadata.Capability;
import org.apache.uima.resource.metadata.OperationalProperties;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.resource.metadata.ResourceMetaData;
import org.apache.uima.util.Level;
import org.apache.uima.util.ProcessTrace;
import org.apache.uima.util.ProcessTraceEvent;
import org.apache.uima.util.Progress;
import org.apache.uima.util.UimaTimer;
import org.apache.uima.util.impl.ProcessTrace_impl;
* Responsible for creating and initializing processing threads. This instance manages the lifecycle
* of the CPE components. It exposes API for plugging in components programmatically instead of
* declaratively. Running in its own thread, this components creates seperate Processing Pipelines
* for Analysis Engines and Cas Consumers, launches configured CollectionReader and attaches all of
* those components to form a pipeline from source to sink. The Collection Reader feeds Processing
* Threads containing Analysis Engines, and Analysis Engines feed results of analysis to Cas
* Consumers.
public class CPMEngine extends Thread {
private static final int MAX_WAIT_ON_QUEUE = 400;
private static final int CAS_PROCESSED_MSG = 1000;
private static final String SINGLE_THREADED_MODE = "single-threaded";
public CPECasPool casPool;
// Used internally for synchronization
public final Object lockForPause = new Object();
// CollectionReader to be used by this CPM
private BaseCollectionReader collectionReader = null;
// Flag indicating if the CPM should pause
// Accesses to this flag (read and write) must
// be done while holding the "lockForPause" lock
// via synch
// @GuardedBy(lockForPause)
protected boolean pause = false;
// Flag indicating if this CPM is running or not
// Marked volatile because it is set and read on different threads without synchronization
protected volatile boolean isRunning = false;
// Flag indicating if this CPM has been stopped
// Marked volatile because it is set and read on different threads without synchronization
protected volatile boolean stopped = false;
// Flag indicating if this CPM has been killed
// Marked volatile because it is set and read on different threads without synchronization
protected volatile boolean killed = false;
// Flag indicating if this CPM should be paused on exception
private boolean pauseOnException = false;
// List of all annotators
private LinkedList annotatorList = new LinkedList();
private LinkedList annotatorDeployList = new LinkedList();
// List of CasConsumers
private LinkedList consumerList = new LinkedList();
private LinkedList consumerDeployList = new LinkedList();
// Number of entities this CPM must process.
private long numToProcess = -1;
private int poolSize = 0;
// ProcessTrace aggregating CPMs performance stats
private ProcessTrace procTr = null;
// private EntityProcessStatusImpl enProcSt = null;
// used to during recovery stage after CPM failure or forced shutdown
// private ProcessTrace restoredProcTr = null;
// Map for storing runtime statistics. used for reporting
private Map stats = new HashMap();
// List of all callback listeners
private ArrayList statusCbL = new ArrayList();
// Number of entities to fetch for every getNext()
private int readerFetchSize = 1;
// Size of the work queue. This queue is shared among processing units with deployed annotators.
// The ArtifactProducer deposits entities into this queue, while ProcessingUnits dequeue them.
private int inputQueueSize = 1;
// Size of the output queue. This queue is shared with deployed casconsumers.
private int outputQueueSize = 1;
// Number of concurrent processing units (pipelines)
private int concurrentThreadCount = 1;
private Hashtable analysisEngines = new Hashtable();
private Hashtable consumers = new Hashtable();
private CasProcessor[] casprocessorList;
// Component responsible for asynchronous read from the CollectionReader. It places Cas'es into
// work Queue
private ArtifactProducer producer = null;
// Factory responsible for instantiating CPE components from CPE descriptor
private CPEFactory cpeFactory = null;
// An array holding instances of components responsible for analysis
protected ProcessingUnit[] processingUnits = null;
// Instantiate a Processing Unit containing CasConsumers. There may be many Analysis Processing
// Units
// but there is one CasConsumer Processing Unit ( at least for now).
private ProcessingUnit casConsumerPU = null;
// Queue where result of analysis goes to be consumed by Consumers
protected BoundedWorkQueue outputQueue = null;
// Queue were Cas'es meant for analysis are deposited by ArtifactProducer
protected BoundedWorkQueue workQueue = null;
private CheckpointData checkpointData = null;
private boolean mixedCasProcessorTypeSupport = false;
private Properties mPerformanceTuningSettings = UIMAFramework
private DebugControlThread dbgCtrlThread = null;
private ProcessControllerAdapter pca = null;
private int activeProcessingUnits = 1;
private boolean hardKill = false;
private Hashtable skippedDocs = new Hashtable();
private Capability[] definedCapabilities = null;
private boolean needsTCas = true;
private long crFetchTime = 0;
private int readerState = 0;
private boolean dropCasOnExceptionPolicy = false;
private boolean singleThreadedCPE = false;
private NonThreadedProcessingUnit nonThreadedProcessingUnit = null;
private NonThreadedProcessingUnit nonThreadedCasConsumerProcessingUnit = null;
private LinkedList initial_cp_list = new LinkedList(); // this list is used to hold Cas
// Processors
// It contains both AEs and CCs.
private boolean casProcessorsDeployed = false;
private boolean consumerThreadStarted = false;
private boolean readerThreadStarted = false;
private int[] processingThreadsState = null;
* Initializes Collection Processing Engine. Assigns this thread and all processing threads
* created by this component to a common Thread Group.
* @param aThreadGroup -
* contains all CPM related threads
* @param aCpeFactory -
* CPE factory object responsible for parsing cpe descriptor and creating components
* @param aProcTr -
* instance of the ProcessTrace where the CPM accumulates stats
* @param aCheckpointData -
* checkpoint object facillitating restart from the last known point
public CPMEngine(CPMThreadGroup aThreadGroup, CPEFactory aCpeFactory, ProcessTrace aProcTr,
CheckpointData aCheckpointData) throws Exception {
super(aThreadGroup, "CPMEngine Thread");
cpeFactory = aCpeFactory;
// Accumulate trace info in provided ProcessTrace instance
procTr = aProcTr;
// Determine in which mode to start the engine: single or multi-threaded
if (cpeFactory.getCPEConfig() != null
&& cpeFactory.getCPEConfig().getDeployment().equalsIgnoreCase(SINGLE_THREADED_MODE)) {
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
singleThreadedCPE = true;
} else {
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
checkpointData = aCheckpointData;
// enProcSt = new EntityProcessStatusImpl(procTr);
CPEFactory factory = this.cpeFactory;
if (factory != null) {
CpeDescription desc = factory.getCpeDescriptor();
if (desc != null) {
CpeCasProcessors proc = desc.getCpeCasProcessors();
if (proc != null) {
dropCasOnExceptionPolicy = proc.getDropCasOnException();
* Returns a list of Processing Containers for Analysis Engines. Each CasProcessor is managed by
* its own container.
public LinkedList getProcessingContainers() {
return annotatorList;
* Returns a list of All Processing Containers. Each CasProcessor is managed by its own container.
public LinkedList getAllProcessingContainers() {
LinkedList all = new LinkedList();
return all;
* Returns number of processing threads
* @return - number of processing threads
* @throws ResourceConfigurationException
public int getThreadCount() throws ResourceConfigurationException {
return cpeFactory.getProcessingUnitThreadCount();
* Plugs in a map where the engine stores perfomance info at runtime
* @param aMap -
* map for runtime stats and totals
public void setStats(Map aMap) {
stats = aMap;
* Returns CPE stats
* @return Map containing CPE stats
public Map getStats() {
return stats;
* Sets a global flag to indicate to the CPM that it should pause whenever exception occurs
* @param aPause -
* true if pause is requested on exception, false otherwise
public void setPauseOnException(boolean aPause) {
pauseOnException = aPause;
* Returns if the CPM should pause when exception occurs
* @return - true if the CPM pauses when exception occurs, false otherwise
public boolean isPauseOnException() {
return pauseOnException;
* Defines the size of inputQueue. The queue stores this many entities read from the
* CollectionReader. Every processing pipeline thread will read its entities from this input
* queue. The CollectionReader is decoupled from the consumer of entities, and continuously
* replenishes the input queue.
* @param aBatchSize
* the size of the batch.
public void setInputQueueSize(int aInputQueueSize) {
inputQueueSize = aInputQueueSize;
* Defines the size of outputQueue. The queue stores this many entities enqueued by every
* processing pipeline thread.The results of analysis are dumped into this queue for consumer
* thread to consume its contents.
* @param aBatchSize
* the size of the batch.
public void setOutputQueueSize(int aOutputQueueSize) {
outputQueueSize = aOutputQueueSize;
* Defines the size of Cas Pool.
* @param aPoolSize
* the size of the Cas pool.
public void setPoolSize(int aPoolSize) {
poolSize = aPoolSize;
public int getPoolSize() {
return poolSize;
* Defines number of threads executing the processing pipeline concurrently.
* @param aBatchSize
* the size of the batch.
public void setConcurrentThreadSize(int aConcurrentThreadSize) {
concurrentThreadCount = aConcurrentThreadSize;
* (non-Javadoc)
* @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
public void addStatusCallbackListener(BaseStatusCallbackListener aListener) {
* Returns a list of ALL callback listeners currently registered with the CPM
* @return -
public ArrayList getCallbackListeners() {
return statusCbL;
* Unregisters given listener from the CPM
* @param aListener -
* instance of {@link BaseStatusCallbackListener} to unregister
public void removeStatusCallbackListener(BaseStatusCallbackListener aListener) {
* Returns true if this engine has been killed
* @return
public boolean isKilled() {
return killed;
* Dumps some internal state of the CPE. Used for debugging.
private void dumpState() {
try {
if (cpeFactory.getCPEConfig() != null
&& cpeFactory.getCPEConfig().getDeployment().equalsIgnoreCase(SINGLE_THREADED_MODE)) {
new Object[] { Thread.currentThread().getName(),
String.valueOf(this.readerState) });
for (int i = 0; processingUnits != null && i < processingUnits.length; i++) {
new Object[] { Thread.currentThread().getName(), String.valueOf(i),
String.valueOf(processingUnits[i].threadState) });
if (casConsumerPU != null) {
new Object[] { Thread.currentThread().getName(),
String.valueOf(casConsumerPU.threadState) });
} else {
if (producer != null) {
new Object[] { Thread.currentThread().getName(),
String.valueOf(producer.threadState) });
for (int i = 0; processingUnits != null && i < processingUnits.length; i++) {
new Object[] { Thread.currentThread().getName(), String.valueOf(i),
String.valueOf(processingUnits[i].threadState) });
if (casConsumerPU != null) {
new Object[] { Thread.currentThread().getName(),
String.valueOf(casConsumerPU.threadState) });
} catch (Exception e) { // ignore. This is called on stop()
* Kill CPM the hard way. None of the entities in the queues will be processed. This methof simply
* empties all queues and at the end adds EOFToken to the work queue so that all threads go away.
public void killIt() {
isRunning = false;
killed = true;
hardKill = true;
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_killing_cpm__INFO",
new Object[] { Thread.currentThread().getName() });
if (workQueue != null) {
while (workQueue.getCurrentSize() > 0) {
if (outputQueue != null) {
while (outputQueue.getCurrentSize() > 0) {
if (casPool != null) {
synchronized (casPool) {
if (workQueue != null) {
Object[] eofToken = new Object[1];
// only need one member in the array
eofToken[0] = new EOFToken();
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_terminate_pipelines__INFO",
new Object[] { Thread.currentThread().getName(), String.valueOf(killed) });
synchronized (workQueue) {
* Returns if the CPE was killed hard. Soft kill allows the CPE to finish processing all
* in-transit CASes. Hard kill causes the CPM to stop processing and to throw away all unprocessed
* CASes from its queues.
* @return
public boolean isHardKilled() {
return hardKill;
* @deprecated
public void asynchStop() {
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_terminate_pipelines__INFO",
new Object[] { Thread.currentThread().getName(), String.valueOf(killed) });
new Thread() {
public void run() {
Object[] eofToken = new Object[1];
eofToken[0] = new EOFToken();
stopped = true;
killed = true;
if (!isRunning) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
// Already stopped
try {
// Change global status
isRunning = false;
// terminate this thread if the thread has been previously suspended
synchronized (lockForPause) {
if (pause) {
pause = false;
// Let processing threads finish their work by emptying all queues. Even during a hard
// stop we should try to clean things up as best as we can. First empty process queue or
// work
// queue, dump result of analysis into output queue and let the consumers process that.
// When all queues are empty we are done.
int cc = workQueue.getCurrentSize();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), workQueue.getName(),
String.valueOf(cc) });
while (workQueue.getCurrentSize() > 0) {
if (System.getProperty("DEBUG") != null) {
if (cc < workQueue.getCurrentSize()) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), workQueue.getName(),
String.valueOf(workQueue.getCurrentSize()) });
cc = workQueue.getCurrentSize();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), workQueue.getName(),
String.valueOf(outputQueue.getCurrentSize()) });
while (outputQueue.getCurrentSize() > 0) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), workQueue.getName(),
String.valueOf(outputQueue.getCurrentSize()) });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
for (int i = 0; processingUnits != null && i < processingUnits.length; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
} catch (Exception e) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
* Stops execution of the Processing Pipeline and this thread.
public void stopIt() {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), String.valueOf(killed) });
stopped = true;
killed = true;
if (!isRunning) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_already_stopped__FINEST",
new Object[] { Thread.currentThread().getName() });
// Already stopped
try {
// Change global status
isRunning = false;
// terminate this thread if the thread has been previously suspended
synchronized (lockForPause) {
if (pause) {
pause = false;
// Let processing threads finish their work by emptying all queues. Even during a hard
// stop we should try to clean things up as best as we can. First empty process queue or work
// queue, dump result of analysis into output queue and let the consumers process that.
// When all queues are empty we are done.
if (workQueue != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), workQueue.getName(),
String.valueOf(workQueue.getCurrentSize()) });
int cc = workQueue.getCurrentSize();
while (workQueue.getCurrentSize() > 0) {
if (System.getProperty("DEBUG") != null) {
if (cc < workQueue.getCurrentSize()) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), workQueue.getName(),
String.valueOf(workQueue.getCurrentSize()) });
cc = workQueue.getCurrentSize();
if (outputQueue != null) {
while (outputQueue.getCurrentSize() > 0) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
String.valueOf(outputQueue.getCurrentSize()) });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
for (int i = 0; processingUnits != null && i < processingUnits.length
&& processingUnits[i] != null; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_stop_processors__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
} catch (Exception e) {
if (UIMAFramework.getLogger().isLoggable(Level.FINER)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
* Returns index to a CasProcessor with a given name in a given List
* @param aDeployList -
* List of CasConsumers to be searched
* @param aName -
* name of the CasConsumer we want to find
* @return 0 - if a CasConsumer is not found in a list, else returns a position in the list where
* the CasConsumer can found
private int getIndexInList(List aDeployList, String aName) {
for (int i = 0; i < aDeployList.size(); i++) {
List innerList = (ArrayList) aDeployList.get(i);
String currentCPName = ((CasProcessor) innerList.get(0)).getProcessingResourceMetaData()
if (aName != null && aName.trim().equals(currentCPName.trim())) {
return i;
return 0;
* Find the position in the list of the Cas Processor with a given name
* @param aName
* @param aList
* @return
private int getPositionInListIfExists(String aName, List aList) {
for (int i = 0; i < aList.size(); i++) {
List innerList = (ArrayList) aList.get(i);
// Get the name of the first CP in the list. The inner list contains CPs of the same kind
String currentCPName = ((CasProcessor) innerList.get(0)).getProcessingResourceMetaData()
if (aName != null && aName.trim().equals(currentCPName.trim())) {
return i;
return -1;
* Parses Cas Processor descriptor and checks if it is parallelizable.
* @param aDescPath -
* fully qualified path to a CP descriptor
* @param aCpName -
* name of the CP
* @param isConsumer -
* true if the CP is a Cas Consumer, false otherwise
* @return - true if CP is parallelizable, false otherwise
* @throws Exception
private boolean isMultipleDeploymentAllowed(String aDescPath, String aCpName, boolean isConsumer)
throws Exception {
OperationalProperties op = null;
// Parse the descriptor to access Operational Properties
ResourceSpecifier resourceSpecifier = cpeFactory.getSpecifier(new File(aDescPath).toURL());
if (resourceSpecifier != null && resourceSpecifier instanceof ResourceCreationSpecifier) {
ResourceMetaData md = ((ResourceCreationSpecifier) resourceSpecifier).getMetaData();
if (md instanceof ProcessingResourceMetaData) {
op = ((ProcessingResourceMetaData) md).getOperationalProperties();
if (op == null) {
// Operational Properties not defined, so use defaults
if (isConsumer) {
return false; // the default for CasConsumer
return true; // default for AEs
return op.isMultipleDeploymentAllowed();
throw new ResourceConfigurationException(ResourceInitializationException.NOT_A_CAS_PROCESSOR,
new Object[] { aCpName, "<unknown>", aDescPath });
* Determines if a given Cas Processor is parallelizable. Remote Cas Processors are by default
* parallelizable. For integrated and managed the CPM consults Cas Processor's descriptor to
* determine if it is parallelizable.
* @param aProcessor -
* Cas Processor being checked
* @param aCpName -
* name of the CP
* @return - true if CP is parallelizable, false otherwise
* @throws Exception
public boolean isParallizable(CasProcessor aProcessor, String aCpName) throws Exception {
boolean isConsumer = false;
if (aProcessor instanceof CasConsumer || aProcessor instanceof CasDataConsumer) {
isConsumer = true;
// casProcessingConfigMap may not contain configuration for this Cas Processor if this CP has
// been
// added dynamically via API. In this case, just go to metadata and determine via its
// OperationalProperties if this is parallelizable component.
if (!cpeFactory.casProcessorConfigMap.containsKey(aCpName)) {
OperationalProperties op = aProcessor.getProcessingResourceMetaData()
if (op != null) {
return op.isMultipleDeploymentAllowed();
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), aCpName });
if (isConsumer) {
return false; // by default the CasConsumer is not parallizable
return true; // by dafault AEs are parallizable
// Retrieve Cas Processor's CPE descriptor configuration.
CpeCasProcessor casProcessorCPEConfig = (CpeCasProcessor) cpeFactory.casProcessorConfigMap
if (Constants.DEPLOYMENT_LOCAL.equalsIgnoreCase(casProcessorCPEConfig.getDeployment())) {
// Extract the client service descriptor.
URL descriptorUrl = cpeFactory.getDescriptorURL(casProcessorCPEConfig);
Descriptor descriptor = new Descriptor(descriptorUrl.toString());
// From the client service descriptor extract the actual Cas Processor descriptor
String aResourceSpecifierPath = descriptor.getResourceSpecifierPath();
// Determine if this Cas Processor is parallelizable
boolean is = isMultipleDeploymentAllowed(aResourceSpecifierPath, casProcessorCPEConfig
.getName(), isConsumer);
return is;
} else if (Constants.DEPLOYMENT_INTEGRATED.equalsIgnoreCase(casProcessorCPEConfig
.getDeployment())) {
// If OperationalProperties are not defined use defaults based on CasProcessor type
if (aProcessor.getProcessingResourceMetaData().getOperationalProperties() == null) {
if (isConsumer) {
return false; // default for CasConsumer
return true; // default for AEs
return aProcessor.getProcessingResourceMetaData().getOperationalProperties()
// Default is parallelizable
return true;
* Adds Cas Processor to a single-threaded pipeline. This pipeline is fed by the output queue and
* typicall contains Cas Consumers. AEs can alos be part of this pipeline.
* @param aProcessor -
* Cas Processor to add to single-threaded pipeline
* @param aCpName -
* name of the Cas Processor
* @throws Exception
private void addCasConsumer(CasProcessor aProcessor, String aCpName) throws Exception {
if (consumers.containsKey(aCpName)) {
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), aCpName });
int listIndex = getIndexInList(consumerDeployList, aCpName);
((List) consumerDeployList.get(listIndex)).add(aProcessor);
} else {
ArrayList newList = new ArrayList();
consumers.put(aCpName, newList);
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), aCpName });
if (cpeFactory.isDefault()) {
* Add Cas Processor to a list of CPs that are to be run in the parallelizable pipeline. The fact
* that the CP is in parallelizable pipeline does not mean that there will be instance per
* pipeline of CP. Its allowed to have a single instance, shareable CP running in multi-threaded
* pipeline.
* @param aProcessor -
* CP to add to parallelizable pipeline
* @param aCpName -
* name of the CP
* @throws Exception
private void addParallizableCasProcessor(CasProcessor aProcessor, String aCpName)
throws Exception {
UIMAFramework.getLogger(this.getClass()).log(Level.CONFIG, " Adding New Annotator:" + aCpName);
if (analysisEngines.containsKey(aCpName)) {
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), aCpName });
int listIndex = getIndexInList(annotatorDeployList, aCpName);
((List) annotatorDeployList.get(listIndex)).add(aProcessor);
} else {
ArrayList newList = new ArrayList();
newList.add(0, aProcessor);
analysisEngines.put(aCpName, newList);
annotatorDeployList.add(0, newList);
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), aCpName });
if (cpeFactory.isDefault()) {
* Classify based on Cas Processor capability to run in parallel. Some Cas Processors need to run
* as single instance only. It scans the list of Cas Processors backwords and moves those Cas
* Processors that are not parallelizable to a separate single-threade pipeline. This process of
* moving CPs continues until the first parallelizable Cas Processor is found. Beyond this all Cas
* Processors are moved to a parallelizable pipeline. If the non-parallelizable CP is in the
* parallelizable pipeline there simply will be a single instance of it that will be shared by all
* processing threads.
* @throws Exception
private void classifyCasProcessors() throws Exception {
boolean allowReorder = true;
// Walk the list of Cas Processor backwards. The list of Cas Processors is actually a list of
// lists.
// Each sub-list contains instances of the same type of Cas Processor.
for (int i = initial_cp_list.size(); i > 0; i--) {
// Get the sub-list containing instances of Cas Processor
ArrayList cp_instance_list = (ArrayList) initial_cp_list.get(i - 1);
String previous = ""; // hold the previous name of the Cas Processor
// Check the list of CP instances to check if its parallelizable
for (int j = 0; j < cp_instance_list.size(); j++) {
CasProcessor cp = (CasProcessor) cp_instance_list.get(j);
String name = cp.getProcessingResourceMetaData().getName();
// Check if the CP is parallelizable
boolean parallizable = isParallizable(cp, name);
// If Cas Processor is not parallizable and we have not yet hit a parallizable component
// place the Cas Processor in a pipeline that supports single instance components
if (!parallizable && allowReorder) {
// There should only be one instance. The current implementation supports placing
// non-parallizable Analysis Engines in the Cas Consumer Pipeline.
if (!previous.equals(name)) {
addCasConsumer(cp, name);
} else {
// Hit the parallizable Cas Processor. From this point of all Cas Processors will be added
// to the main processing pipeline ( as opposed to Cas Consumer Pipeline)
allowReorder = false;
// If the Cas Processor is non-parallizable ad just one instance of it to the Pipeline.
if (parallizable || !previous.equals(name)) {
addParallizableCasProcessor(cp, name);
if (!parallizable) {
previous = name;
* Adds a CASProcessor to the processing pipeline. If a CasProcessor already exists and its
* status=DISABLED this method will re-enable the CasProcesser.
* @param aProcessor
* CASProcessor to be added to the processing pipeline
public void addCasProcessor(CasProcessor aCasProcessor) throws ResourceConfigurationException {
String name = aCasProcessor.getProcessingResourceMetaData().getName();
// Set a global flag to indicate the we should support mixed CasProcessor types.
// When this supported is enabled TCAS array will be instantiated to facilitate
// conversions between CasData and TCAS.
if (aCasProcessor instanceof CasObjectProcessor || aCasProcessor instanceof CasConsumer) {
mixedCasProcessorTypeSupport = true;
ArrayList newList = null;
int indexPos = getPositionInListIfExists(name, initial_cp_list);
if (indexPos == -1) {
newList = new ArrayList();
// New Cas Processor. Add it to a list
} else {
newList = (ArrayList) initial_cp_list.get(indexPos);
* Adds a CASProcessor to the processing pipeline at a given place in the processing pipeline
* @param aProcessor
* CASProcessor to be added to the processing pipeline
* @param aIndex -
* insertion point for a given CasProcessor
public void addCasProcessor(CasProcessor aCasProcessor, int aIndex)
throws ResourceConfigurationException {
* Removes a CASProcessor from the processing pipeline
* @param aCasProcessorIndex -
* CasProcessor position in processing pipeline
public void removeCasProcessor(int aCasProcessorIndex) {
if (aCasProcessorIndex < 0 || aCasProcessorIndex > annotatorList.size()) {
* Disable a CASProcessor in the processing pipeline
* @param aProcessor
* CASProcessor to be added to the processing pipeline
public void disableCasProcessor(int aCasProcessorIndex) {
if (aCasProcessorIndex < 0 || aCasProcessorIndex > annotatorList.size()) {
ProcessingContainer pc = ((ProcessingContainer) annotatorList.get(aCasProcessorIndex));
if (pc != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_diabled_cp__FINEST",
new Object[] { Thread.currentThread().getName(), pc.getName() });
* Disable a CASProcessor in the processing pipeline
* @param aProcessor
* CASProcessor to be added to the processing pipeline
public void disableCasProcessor(String aCasProcessorName) {
for (int i = 0; i < annotatorList.size(); i++) {
ProcessingContainer pc = ((ProcessingContainer) annotatorList.get(i));
if (pc.getName().equals(aCasProcessorName)) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_diabled_cp__FINEST",
new Object[] { Thread.currentThread().getName(), pc.getName() });
* Disable a CASProcessor in the processing pipeline
* @param aProcessor
* CASProcessor to be added to the processing pipeline
public void enableCasProcessor(String aCasProcessorName) {
for (int i = 0; i < annotatorList.size(); i++) {
ProcessingContainer pc = ((ProcessingContainer) annotatorList.get(i));
if (pc.getName().equals(aCasProcessorName)) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_enabled_cp__FINEST",
new Object[] { Thread.currentThread().getName(), aCasProcessorName });
* Returns all CASProcesors in the processing pipeline
public CasProcessor[] getCasProcessors() {
if (casprocessorList != null) {
return casprocessorList;
// If CasProcessors have not yet been classified into AEs and CCs use the
// initial_cp_list. This list is populated early, right after the CPE
// descriptor is parsed. It is a list of lists, containing as many
// instances of each Cas Processor as defined in the CPE descriptor.
// The number of instances is determined based on number of processing
// threads and CP property setting that determines if the CP is able
// to run in parallel.
if (casProcessorsDeployed == false) {
CasProcessor[] casprocessorList = new CasProcessor[initial_cp_list.size()];
ArrayList list;
for (int i = 0; i < initial_cp_list.size(); i++) {
list = (ArrayList) initial_cp_list.get(i);
for (int j = 0; j < list.size(); j++) {
casprocessorList[i] = (CasProcessor) list.get(j);
return casprocessorList;
// CasProcessors have been classified into AEs and CCs, so merge the two lists
ArrayList aList = new ArrayList();
Iterator keyIt = analysisEngines.keySet().iterator();
while (keyIt.hasNext()) {
String keyName = (String);
List kList = (List) analysisEngines.get(keyName);
if (kList != null) {
for (int i = 0; i < kList.size(); i++) {
keyIt = consumers.keySet().iterator();
while (keyIt.hasNext()) {
String keyName = (String);
List kList = (List) consumers.get(keyName);
if (kList != null) {
for (int i = 0; i < kList.size(); i++) {
if (aList.size() == 0)
return null;
casprocessorList = new CasProcessor[aList.size()];
for (int j = 0; j < aList.size(); j++) {
casprocessorList[j] = (CasProcessor) aList.get(j);
return casprocessorList;
* Deploys all Cas Consumers
* @throws AbortCPMException
private void deployConsumers() throws AbortCPMException {
if (consumerDeployList == null || consumerDeployList.size() == 0) {
CasProcessorDeployer deployer = null;
// Deploy each CASProcessor in a seperate container
for (int i = consumerDeployList.size(); i > 0; i--) {
try {
// Deployer deploys as many instances of CASProcessors as there are threads
List cpList = (ArrayList) consumerDeployList.get((i - 1)); // list is zero-based
String name = ((CasProcessor) cpList.get(0)).getProcessingResourceMetaData().getName();
if (cpList.size() > 0) {
// Get a deployer for this type of CasProcessor. The type of deployer is determined from
// the
// CPE Configuration. Specifically from the deployment model for this CasProcessor.
CpeCasProcessor casProcessorType = (CpeCasProcessor) cpeFactory.casProcessorConfigMap
deployer = DeployFactory.getDeployer(cpeFactory, casProcessorType, pca);
// Deploy CasConsumer.
ProcessingContainer container = deployer.deployCasProcessor(cpList, false);
} catch (Exception e) {
throw new AbortCPMException(e.getMessage());
* Deploys CasProcessor and associates it with a {@link ProcessingContainer}
* @param aProcessingContainer
public void redeployAnalysisEngine(ProcessingContainer aProcessingContainer) throws Exception {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_deploying_cp__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName() });
CasProcessorDeployer deployer = aProcessingContainer.getDeployer();
* Deploys All Analysis Engines. Analysis Engines run in a replicated processing units seperate
* from Cas Consumers.
* @throws AbortCPMException
private void deployAnalysisEngines() throws AbortCPMException {
// When restoring the CPM from a checkpoint, its processing pipeline must be restored
// to a previous state. So all CasProcessors that were disabled during the previous run
// will remain disabled. All stats will be recovered as well.
// if (restoredProcTr != null)
if (checkpointData != null) {
// Restore CPM related stats from the checkppoint
restoreFromCheckpoint("CPM", "CPM PROCESSING TIME");
CasProcessorDeployer deployer = null;
// Deploy each CASProcessor in a seperate container
for (int i = 0; i < annotatorDeployList.size(); i++) {
try {
// Deployer deploys as many instances of CASProcessors as there are threads
List cpList = (ArrayList) annotatorDeployList.get(i);
String name = ((CasProcessor) cpList.get(0)).getProcessingResourceMetaData().getName();
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), name });
if (cpList.size() > 0) {
// Get a deployer for this type of CasProcessor. The type of deployer is determined from
// the
// CPE Configuration. Specifically from the deployment model for this CasProcessor. The
// first
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_lookup_cp__FINEST",
new Object[] { Thread.currentThread().getName(), name });
if (!cpeFactory.casProcessorConfigMap.containsKey(name)) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
new Object[] { Thread.currentThread().getName(), name });
throw new Exception(CpmLocalizedMessage.getLocalizedMessage(
new Object[] { Thread.currentThread().getName(), name }));
CpeCasProcessor casProcessorCPEConfig = (CpeCasProcessor) cpeFactory.casProcessorConfigMap
if (casProcessorCPEConfig == null) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
new Object[] { Thread.currentThread().getName(), name });
throw new Exception(CpmLocalizedMessage.getLocalizedMessage(
new Object[] { Thread.currentThread().getName(), name }));
} else if (casProcessorCPEConfig.getDeployment() == null
|| casProcessorCPEConfig.getDeployment().trim().length() == 0) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
new Object[] { Thread.currentThread().getName(), name });
throw new Exception(CpmLocalizedMessage.getLocalizedMessage(
"UIMA_CPM_Exception_invalid_deployment__WARNING", new Object[] {
Thread.currentThread().getName(), name,
casProcessorCPEConfig.getDeployment() }));
deployer = DeployFactory.getDeployer(cpeFactory, casProcessorCPEConfig, pca);
// Deploy CasConsumer.
ProcessingContainer container = deployer.deployCasProcessor(cpList, false);
} catch (Exception e) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), e.getMessage() });
throw new AbortCPMException(e.getMessage());
* Starts CASProcessor containers one a time. During this phase the container deploys a TAE as
* local,remote, or integrated CasProcessor.
public void deployCasProcessors() throws AbortCPMException {
try {
} catch (Exception e) {
throw new AbortCPMException(e.getMessage());
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).log(Level.CONFIG, "Deploying Analysis Engines");
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).log(Level.CONFIG, "Deploying CasConsumers");
casProcessorsDeployed = true;
* Restores named events from the checkpoint
* @param component -
* component name to restore named event for
* @param aEvType -
* event to restore
private void restoreFromCheckpoint(String component, String aEvType) {
if (checkpointData == null) {
return; // nothing to restore
ProcessTrace restoredProcTr = checkpointData.getProcessTrace();
try {
// Retrieve all events associated with a named component
List eList = restoredProcTr.getEventsByComponentName(component, true);
if (!eList.isEmpty()) {
// Copy named events found in checkpoint to the current procTr
copyComponentEvents(aEvType, eList, procTr);
} catch (Exception e) {
* Copy given component events
* @param aEvType -
* event type
* @param aList -
* list of events to copy
* @param aPTr -
* @throws IOException
private void copyComponentEvents(String aEvType, List aList, ProcessTrace aPTr)
throws IOException {
String typeS;
for (int i = 0; i < aList.size(); i++) {
ProcessTraceEvent prEvent = (ProcessTraceEvent) aList.get(i);
typeS = prEvent.getType();
if (aEvType != null && aEvType.equals(typeS)) {
* Returns a global flag indicating if this Thread is in processing state
public boolean isRunning() {
return isRunning;
* Returns a global flag indicating if this Thread is in pause state
public boolean isPaused() {
synchronized (lockForPause) {
return (pause == true);
* Pauses this thread
public void pauseIt() {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_pause_cpe__FINEST",
new Object[] { Thread.currentThread().getName() });
synchronized (lockForPause) {
pause = true;
* Resumes this thread
public void resumeIt() {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_resume_cpe__FINEST",
new Object[] { Thread.currentThread().getName() });
synchronized (lockForPause) {
pause = false;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_notify_engine__FINEST",
new Object[] { Thread.currentThread().getName() });
* Sets CollectionReader to use during processing
* @param BaseCollectionReader
* aCollectionReader
public void setCollectionReader(BaseCollectionReader aCollectionReader) {
collectionReader = aCollectionReader;
if (collectionReader.getProcessingResourceMetaData().getConfigurationParameterSettings()
.getParameterValue("fetchSize") != null) {
try {
readerFetchSize = ((Integer) collectionReader.getProcessingResourceMetaData()
} catch (NumberFormatException nfe) {
readerFetchSize = 1; // restore default
if (checkpointData != null && checkpointData.getSynchPoint() != null
&& collectionReader != null) {
if (collectionReader instanceof RecoverableCollectionReader) {
try {
// Let the CollectionReader do the synchronization to the last known (good) read point
((RecoverableCollectionReader) collectionReader).moveTo(checkpointData.getSynchPoint());
String readerName = collectionReader.getProcessingResourceMetaData().getName();
if (readerName != null) {
restoreFromCheckpoint(readerName, "COLLECTION READER PROCESSING TIME");
} catch (Exception e) {
* Defines the size of the batch
public void setNumToProcess(long aNumToProcess) {
numToProcess = aNumToProcess;
* Returns Id of the last document processed
public String getLastProcessedDocId() {
return producer.getLastDocId();
public String getLastDocRepository() {
return "";
* Instantiate custom Processing Pipeline
* @param aClassName -
* name of a class that extends ProcessingUnit
* @return - an instance of the ProcessingUnit
* @throws Exception
private ProcessingUnit producePU(String aClassName) throws Exception {
Class currentClass = Class.forName(aClassName);
// check to see if this is a subclass of aResourceClass
ProcessingUnit pu = (ProcessingUnit) currentClass.newInstance();
return pu;
private void startDebugControlThread() {
String dbgCtrlFile = System.getProperty("DEBUG_CONTROL");
dbgCtrlThread = new DebugControlThread(this, dbgCtrlFile, 1000);
* Instantiate custom Output Queue
* @param aQueueSize -
* max size of the queue
* @return - new instance of the output queue
* @throws Exception
private BoundedWorkQueue createOutputQueue(int aQueueSize) throws Exception {
// Get the class that implements the queue
if (cpeFactory.getCPEConfig().getOutputQueue() != null
&& cpeFactory.getCPEConfig().getOutputQueue().getClass() != null) {
String outputQueueClass = cpeFactory.getCPEConfig().getOutputQueue().getQueueClass();
if (outputQueueClass != null) {
Class[] args = new Class[] { int.class, String.class, CPMEngine.class };
Class cpClass = Class.forName(outputQueueClass);
Constructor constructor = cpClass.getConstructor(args);
Object[] oArgs = new Object[] { new Integer(aQueueSize), "Sequenced Output Queue", this };
outputQueue = (BoundedWorkQueue) constructor.newInstance(oArgs);
} else {
// default queue
outputQueue = new BoundedWorkQueue(aQueueSize, "Output Queue", this);
return outputQueue;
* Notify listeners of a given exception
* @param e -
* en exception to be sent to listeners
private void notifyListenersWithException(Exception e) {
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, e.getMessage(), e);
ArrayList statusCbL = this.getCallbackListeners();
EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(procTr, true);
// e is the actual exception.
enProcSt.addEventStatus("CPM", "Failed", e);
// Notify all listeners that the CPM has finished processing
for (int j = 0; statusCbL != null && j < statusCbL.size(); j++) {
BaseStatusCallbackListener st = (BaseStatusCallbackListener) statusCbL.get(j);
if (st instanceof StatusCallbackListener) {
((StatusCallbackListener) st).entityProcessComplete(null, enProcSt);
* Callback method used to notify the engine when a processing pipeline is killed due to excessive
* errors. This method is only called if the processing pipeline is unable to acquire a connection
* to remote service and when configuration indicates 'kill-pipeline' as the action to take on
* excessive errors. When running with multiple pipelines, routine decrements a global pipeline
* counter and tests if there are no more left. When all pipelines are killed as described above,
* the CPM needs to terminate. Since pipelines are prematurely killed, there are artifacts (CASes)
* in the work queue. These must be removed from the work queue and disposed of (released) back to
* the CAS pool so that the Collection Reader thread properly exits.
* @param aPipelineThreadName -
* name of the pipeline thread exiting from its run() method
public synchronized void pipelineKilled(String aPipelineThreadName) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_pipeline_terminated__FINEST",
new Object[] { Thread.currentThread().getName(), aPipelineThreadName });
// Adjust the global counter
// Test if there are any processing pipelines left
if (activeProcessingUnits <= 0) {
// Change the global status of the CPM
isRunning = false;
// Check the work queue for any artifacts still unprocessed. This is a likely case, since the
// Collection Reader is asynchronous. If there are artifacts still in the queue, this code
// code needs to removed them and released them back to CAS pool. This needs to be done to
// unblock those threads that are waiting for avaialble CAS instance. Most notably, Collection
// Reader Thread.
while (workQueue != null && workQueue.getCurrentSize() > 0) {
// empty work queue
try {
Object anObject = workQueue.dequeue(1);
if (anObject != null && anObject instanceof CAS[]) {
// Notify listeners of the fact that the CPM is disposing the CAS
notifyListeners(0, (CAS[]) anObject, procTr, new Exception(
"CPM Releases CAS before processing it due to premature CPM shutdown."));
releaseCASes((CAS[]) anObject);
} catch (Exception e) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), e.getMessage() });
* Using given configuration creates and starts CPE processing pipeline. It is either
* single-threaded or a multi-threaded pipeline. Which is actually used depends on the
* configuration defined in the CPE descriptor. In multi-threaded mode, the CPE starts number of
* threads: 1) ArtifactProducer Thread - this is a thread containing a Collection Reader. It runs
* asynchronously and it fills a WorkQueue with CASes. 2) CasConsumer Thread - this is an optional
* thread. It is only instantiated if there Cas Consumers in the pipeline 3) Processing Threads -
* one or more processing threads, configured identically, that are performing analysis How many
* threads are started depends on configuration in CPE descriptor
* All threads started here are placed in a ThreadGroup. This provides a catch-all mechanism for
* errors that may occur in the CPM. If error is thrown, the ThreadGroup is notified. The
* ThreadGroup than notifies all registers listeners to give an application a chance to report the
* error and do necessary cleanup. This routine manages all the threads and makes sure that all of
* them are cleaned up before returning. The ThreadGroup must cleanup all threads under its
* control otherwise a memory leak occurs. Even those threads that are not started must be cleaned
* as they end up in the ThreadGroup when instantiated. The code uses number of state variables to
* make decisions during cleanup.
public void run() {
boolean consumerCompleted = false;
boolean isStarted = false; // Indicates if all threads have been started
if (isKilled()) {
// Single-threaded mode is enabled in the CPE descriptor. In the CpeConfig element check for the
// value of deployAs
// <deployAs>single-threaded</deployAs>
if (singleThreadedCPE) {
try {
} catch (Throwable t) {
killed = true;
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), t.getMessage() });
} finally {
((CPMThreadGroup) getThreadGroup()).cleanup();
// Fix for memory leak. CPMThreadGroup must be
// destroyed, but not until AFTER all threads that it
// owns, including this one, have ended. - Adam
final ThreadGroup group = this.getThreadGroup();
Thread threadGroupDestroyer = new Thread(group.getParent(), "threadGroupDestroyer") {
public void run() {
while (group.activeCount() > 0) {
try {
} catch (InterruptedException e) {
try {
isRunning = true;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_starting_cpe__FINEST",
new Object[] { Thread.currentThread().getName() });
// How many entities to get for each fetch from the CollectionReader. Use default, otherwise
// retrieve and override from ColectionReader descriptor.
readerFetchSize = 1;
if (collectionReader.getProcessingResourceMetaData().getConfigurationParameterSettings()
.getParameterValue("fetchSize") != null) {
readerFetchSize = ((Integer) collectionReader.getProcessingResourceMetaData()
if (System.getProperty("DEBUG_CONTROL") != null) {
// CAS[] casList = null;
if (mixedCasProcessorTypeSupport == false && collectionReader instanceof CollectionReader) {
mixedCasProcessorTypeSupport = true;
// When the CPE is configured to run exclusively with CasDataProcessor type components (no
// CasObjectProcessors)
// there is no need to instantiate TCAS objects. These would never be used and woud waste
// memory.
if (mixedCasProcessorTypeSupport) {
// Instantiate container for TCAS Instances
try {
// Register all type systems with the CAS Manager
if (poolSize == 0) // Not set in the CpeDescriptor
poolSize = readerFetchSize * (inputQueueSize + outputQueueSize)
* cpeFactory.getProcessingUnitThreadCount() + 3;
// This is a hack to limit # of CASes. In WF env where the WF Store decides the size of
// readerFetchSize
// we have a problem with memory. If the store decides to return 1000 entities we will
// need a LOT of
// memory to handle this. So for WF limit the pool size to something more reasonable
if (poolSize > 100) {
.println(" PoolSize exceeds hard limit(100). Redefining size to 60.");
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
new Object[] { Thread.currentThread().getName() });
poolSize = 60; // Hard limit
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), String.valueOf(poolSize) });
casPool = new CPECasPool(poolSize, cpeFactory.getResourceManager().getCasManager(),
} catch (Exception e) {
isRunning = false;
killed = true;
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), e.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", e);
// Instantiate work queue. This queue is shared among all processing units.
// The Producer thread fills this queue with CAS'es and processing units
// retrieve these Cas'es for analysis.
workQueue = new BoundedWorkQueue(poolSize, "Input Queue", this);
// Instantiate output queue. The Cas'es containing result of analysis are deposited to
// this queue, and the CasConsumer Processing Unit retrieves them.
if (consumerList != null && consumerList.size() > 0) {
outputQueue = createOutputQueue(poolSize);
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_create_producer__CONFIG",
new Object[] { Thread.currentThread().getName() });
// Producer is responsible for filling work queue with Cas'es. Runs in a seperate thread until
// all entities are processed or the CPM stops.
producer = new ArtifactProducer(this, casPool);
try {
// Plugin custom timer for measuring performance of the CollectionReader
} catch (Exception e) {
// Use default Timer. Ignore the exception
producer.setUimaTimer(new JavaTimer());
// indicate how many entities to process
// producer.setOutputQueue(outputQueue);
// collect stats in shared instance
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null)
// Just in case check if the CPM has the right state to start
if (isKilled()) {
// Nov 2005, postpone starting the Producer Thread until all other threads are up.
// This prevents a problem when the Producer Thread starts, grabs all CASes, fills the
// input queue and there is an exception BEFORE Processing Units starts. This may lead
// to a hang, because the CR is waiting on the CAS Pool and no-one consumes the Input Queue.
// Name the thread
producer.setName("[CollectionReader Thread]::");
// Create Cas Consumer Thread
if (consumerList != null && consumerList.size() > 0) {
// Create a CasConsumer Processing Unit if there is at least one CasConsumer configured in a
// CPE descriptor
casConsumerPU = new ProcessingUnit(this, outputQueue, null);
// Add Callback Listeners
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null) {
// Notify Callback Listeners when done processing entity
// Add custom timer
try {
} catch (Exception e) {
// Use default Timer
casConsumerPU.setUimaTimer(new JavaTimer());
// name the thread
casConsumerPU.setName("[CasConsumer Pipeline Thread]::");
// start the CasConsumer Thread
consumerThreadStarted = true;
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
new Object[] { Thread.currentThread().getName(),
String.valueOf(workQueue.getCurrentSize()) });
// Adjust number of pipelines. Adjustment may be necessary in deployments using exclusive
// service access. The adjustment is
// based on number of available services that the CPM will connect to. If a static
// configuration calls for 5 processing
// pipelines but only three services are available (assuming exclusive access ), the CPM will
// reduce number of processing
// pipelines to 3.
for (int indx = 0; indx < annotatorList.size(); indx++) {
ProcessingContainer prContainer = (ProcessingContainer) annotatorList.get(indx);
CasProcessorConfiguration configuration = prContainer.getCasProcessorConfiguration();
if (configuration == null) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), prContainer.getName() });
String serviceAccess = configuration.getDeploymentParameter("service-access");
if (serviceAccess != null && serviceAccess.equalsIgnoreCase("exclusive")) {
if (prContainer.getPool() != null) {
int totalInstanceCount = prContainer.getPool().getSize();
if (totalInstanceCount == 0) {
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
new Object[] { Thread.currentThread().getName(), prContainer.getName() });
if (totalInstanceCount < concurrentThreadCount) {
concurrentThreadCount = totalInstanceCount; // override
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
new Object[] { Thread.currentThread().getName(), prContainer.getName() });
// Setup Processing Pipelines
processingUnits = new ProcessingUnit[concurrentThreadCount];
synchronized (this) {
activeProcessingUnits = concurrentThreadCount; // keeps track of how many threads are still
// active. -Adam
// Capture the state of the pipelines. Initially the state is -1, meaning Not Started
processingThreadsState = new int[concurrentThreadCount];
for (int inx = 0; inx < concurrentThreadCount; inx++) {
processingThreadsState[inx] = -1; // Not Started
// Configure Processing Pipelines, and start each running in a seperate thread
for (int i = 0; i < concurrentThreadCount; i++) {
// casList = new CAS[readerFetchSize];
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
// Plug in custom ProcessingUnit via -DPROCESSING_PIPELINE_IMPL=class
// Initialize Processing Pipeline with input and output queues
if (System.getProperty("PROCESSING_PIPELINE_IMPL") != null) {
String puClass = System.getProperty("PROCESSING_PIPELINE_IMPL");
try {
processingUnits[i] = producePU(puClass);
} catch (Exception e) {
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, e.getMessage(), e);
if (dbgCtrlThread != null) {
return; // / DONE HERE !!!
} else {
processingUnits[i] = new ProcessingUnit(this, workQueue, outputQueue);
// If there are no consumers in the pipeline, instruct the pipeline to release a CAS at the
// end of processing
if (consumerList == null || consumerList.size() == 0) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(),
processingUnits[i].getClass().getName() });
// Add tracing instance so that performance and stats are globally aggregated for all
// processing pipelines
// Add all annotators to the processing pipeline
// pass initialized list of cases to processing units in case cas conversion is required
// between
// CasData and CASObject based annotators.
try {
} catch (Exception e) {
processingUnits[i].setUimaTimer(new JavaTimer());
// Add Callback Listeners
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null)
// Start the Processing Unit thread
processingUnits[i].setName("[Procesing Pipeline#" + (i + 1) + " Thread]::");
// Start the Processing Pipeline
processingThreadsState[i] = 1; // Started
// Start the ArtifactProducer thread and the Collection Reader embedded therein. The
// Collection Reader begins
// processing and deposits CASes onto a work queue.
readerThreadStarted = true;
// Indicate that ALL threads making up the CPE have been started
isStarted = true;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
// ==============================================================================================
// Now, wait for ALL CPE threads to finish. Join each thread created and wait for each to
// finish.
// ==============================================================================================
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_threads__FINEST",
new Object[] { Thread.currentThread().getName() });
// Join the producer as it knows when to stop processing. When it is done, it
// simply terminates the thread. Once it terminates lets just make sure that
// all threads finish and the work queue is completely depleted and all entities
// are processed
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
// Join each of the Processing Threads and wait for them to finish
for (int i = 0; i < concurrentThreadCount; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), processingUnits[i].getName(),
String.valueOf(i) });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), processingUnits[i].getName(),
String.valueOf(i) });
// Join the Consumer Thread and wait for it to finish
if (casConsumerPU != null) {
try {
// Throw in a EOF token onto an output queue to indicate end of processing. The consumer
// will stop the processing upon receiving this token
Object[] eofToken = new Object[1];
// only need one member in the array
eofToken[0] = new EOFToken();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
synchronized (outputQueue) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
} catch (Exception e) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), e.getMessage() });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
consumerCompleted = true;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), workQueue.getName(),
String.valueOf(workQueue.getCurrentSize()) });
boolean empty = false;
while (!empty && outputQueue != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
String.valueOf(outputQueue.getCurrentSize()) });
synchronized (outputQueue) {
if (outputQueue.getCurrentSize() == 0) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
String.valueOf(outputQueue.getCurrentSize()) });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
String.valueOf(outputQueue.getCurrentSize()) });
if (casConsumerPU != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cleaning_up_pus__FINEST",
new Object[] { Thread.currentThread().getName() });
// Terminate Annotators and cleanup resources
for (int i = 0; i < processingUnits.length; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_stop_processors__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
if (casConsumerPU != null) {
// Terminate CasConsumers and cleanup
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_engine_stopped__FINEST",
new Object[] { Thread.currentThread().getName() });
if (dbgCtrlThread != null) {
isRunning = false;
} catch (Exception e) {
isRunning = false;
killed = true;
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", e);
// The CPE has not been started successfully. Perhaps only partially started. Meaning, that
// some of its threads are started and some not. This may lead to a memory leak as not started
// threads are never garbage collected. If this is the state of the CPE (!isStarted) go
// through
// a cleanup cycle checking each thread and starting those that have not been started. All
// CPE threads in their run() method MUST check the state of the CPE by calling
// cpe.isRunning()
// as the first thing in their run() methods. If this query returns false, all threads should
// return from run() without doing any work. But at least they will be garbage collected.
if (!isStarted) {
// Cleanup not started threads
// First the ArtifactProducer Thread
if (producer != null && !producer.isRunning()) {
try {
if (!readerThreadStarted) {
} catch (Exception ex1) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_exception__SEVERE",
new Object[] { Thread.currentThread().getName(), ex1.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "", ex1);
// Cleanup CasConsumer
if (casConsumerPU != null && !casConsumerPU.isRunning()) {
try {
if (!consumerThreadStarted) {
} catch (Exception ex1) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_exception__SEVERE",
new Object[] { Thread.currentThread().getName(), ex1.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "", ex1);
try {
// Place EOF Token onto work queue to force PUs shutdown
// Cleanup Processing Threads
for (int i = 0; processingUnits != null && i < concurrentThreadCount; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(),
processingUnits[i].getName(), String.valueOf(i) });
if (processingUnits[i] != null) {
// In case the processing thread was created BUT not started we need to
// start it to make sure it is cleaned up by the ThreadGroup. Not started
// threads hang around in the ThreadGroup despite the fact that are started.
// The run() method is instrumented to immediately exit since the CPE is
// not running. So the thread only starts for a brief moment and than stops.
// This code is only executed in case where the thread is NOT started
// In such a case 'processingThreadsState[i] = -1'
if (processingThreadsState[i] == -1 && !processingUnits[i].isRunning()) {
try {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(),
processingUnits[i].getName(), String.valueOf(i) });
} catch (Exception ex1) {
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
new Object[] { Thread.currentThread().getName(), ex1.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", ex1);
} catch (Exception ex) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), ex.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", ex);
} finally {
if (!consumerCompleted && casConsumerPU != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
try {
Object[] eofToken = new Object[1];
// only need one member in the array
eofToken[0] = new EOFToken();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
synchronized (outputQueue) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
} catch (Exception e) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
try {
} catch (InterruptedException e) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_completed__FINEST",
new Object[] { Thread.currentThread().getName() });
// Fix for memory leak. CPMThreadGroup must be
// destroyed, but not until AFTER all threads that it
// owns, including this one, have ended. - Adam
final ThreadGroup group = this.getThreadGroup();
Thread threadGroupDestroyer = new Thread(group.getParent(), "threadGroupDestroyer") {
public void run() {
Thread[] threads;
while (group.activeCount() > 0) {
try {
} catch (InterruptedException e) {
threads = new Thread[group.activeCount()];
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
private void showThreads(Thread[] aThreadList) {
for (int i = 0; aThreadList != null && i < aThreadList.length; i++) {
if (aThreadList[i] != null && UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), String.valueOf(i),
aThreadList[i].getName() });
* Place EOF Token onto a work queue to force thread exit
private void forcePUShutdown() {
try {
Object[] eofToken = new Object[1];
// only need one member in the array
eofToken[0] = new EOFToken();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), workQueue.getName() });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), workQueue.getName() });
synchronized (workQueue) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), workQueue.getName() });
} catch (Exception e) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception_adding_eof__SEVERE",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
* Return timer to measure performace of the cpm. The timer can optionally be configured in the
* CPE descriptor. If none defined, the method returns default timer.
* @return - customer timer or JavaTimer (default)
* @throws Exception
private UimaTimer getTimer() throws Exception {
String uimaTimerClass = cpeFactory.getCPEConfig().getCpeTimer().get();
if (uimaTimerClass != null) {
new TimerFactory(uimaTimerClass);
return TimerFactory.getTimer();
// If not timer defined return default timer based on System.currentTimeMillis()
return new JavaTimer();
* Null out fields of this object. Call this only when this object is no longer needed.
public void cleanup() {
try {
if (processingUnits != null) {
for (int i = 0; i < this.processingUnits.length; i++) {
if (dbgCtrlThread != null) {
if (casConsumerPU != null) {
this.casConsumerPU = null;
if (collectionReader != null) {
this.collectionReader = null;
if (producer != null) {
this.producer = null;
if (consumerDeployList != null) {
this.consumerDeployList = null;
if (analysisEngines != null) {
this.analysisEngines = null;
if (annotatorDeployList != null) {
this.annotatorDeployList = null;
if (annotatorList != null) {
this.annotatorList = null;
if (consumerList != null) {
this.consumerList = null;
if (consumers != null) {
this.consumers = null;
this.processingUnits = null;
this.casprocessorList = null;
// this.enProcSt = null;
this.stats = null;
this.statusCbL = null;
// this.tcas = null;
this.casPool = null;
// this.restoredProcTr = null;
this.checkpointData = null;
this.procTr = null;
this.cpeFactory = null;
} catch (Exception e) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
* Registers Type Systems of all components with the CasManager.
private void registerTypeSystemsWithCasManager() throws Exception {
CasManager manager= this.cpeFactory.getResourceManager().getCasManager();
ProcessingResourceMetaData crMetaData = collectionReader.getProcessingResourceMetaData();
if (crMetaData != null) {
if (collectionReader instanceof CollectionReader) {
CasInitializer casIni = ((CollectionReader) collectionReader).getCasInitializer();
if (casIni != null && casIni.getProcessingResourceMetaData() != null) {
} else if (collectionReader instanceof CasDataCollectionReader) {
CasDataInitializer casIni = ((CasDataCollectionReader) collectionReader)
if (casIni != null && casIni.getCasInitializerMetaData() != null) {
for (int i = 0; i < annotatorList.size(); i++) {
ProcessingContainer container = (ProcessingContainer) annotatorList.get(i);
if (container.getStatus() == Constants.CAS_PROCESSOR_DISABLED) {
continue; // skip over disabled CasProcessors
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), container.getName() });
CasProcessor processor = container.getCasProcessor();
try {
if (processor instanceof AnalysisEngineImplBase) {
//Integrated AEs already have added their metadata to the CasManager during
//their initialization, so we don't need to do it again.
//(Exception: when running from "old" CPM interface - where AEs are created outside
// and passed in, the AE may not share a ResourceManager with the CPE. In that case
// we DO need to register its metadata.)
if (((AnalysisEngine)processor).getResourceManager() == this.cpeFactory.getResourceManager())
ProcessingResourceMetaData md = processor.getProcessingResourceMetaData();
if (md != null) {
finally {
for (int i = 0; i < consumerList.size(); i++) {
ProcessingContainer container = (ProcessingContainer) consumerList.get(i);
if (container.getStatus() == Constants.CAS_PROCESSOR_DISABLED) {
continue; // skip over disabled CasProcessors
if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINE, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), container.getName() });
CasProcessor processor = container.getCasProcessor();
try {
if (processor instanceof AnalysisEngineImplBase) {
//(Exception: when running from "old" CPM interface - where AEs are created outside
// and passed in, the AE may not share a ResourceManager with the CPE. In that case
// we DO need to register its metadata.)
if (((AnalysisEngine)processor).getResourceManager() == this.cpeFactory.getResourceManager())
ProcessingResourceMetaData md = processor.getProcessingResourceMetaData();
if (md != null) {
finally {
* Call typeSystemInit method on each component
private void callTypeSystemInit() throws ResourceInitializationException {
CAS cas = casPool.getCas();
try {
if (collectionReader instanceof CollectionReader) {
((CollectionReader) collectionReader).typeSystemInit(cas.getTypeSystem());
CasInitializer casIni = ((CollectionReader) collectionReader).getCasInitializer();
if (casIni != null) {
for (int i = 0; i < annotatorList.size(); i++) {
ProcessingContainer container = (ProcessingContainer) annotatorList.get(i);
if (container.getStatus() == Constants.CAS_PROCESSOR_DISABLED) {
continue; // skip over disabled CasProcessors
CasProcessor processor = container.getCasProcessor();
if (processor instanceof CasObjectProcessor) {
((CasObjectProcessor) processor).typeSystemInit(cas.getTypeSystem());
for (int i = 0; i < consumerList.size(); i++) {
ProcessingContainer container = (ProcessingContainer) consumerList.get(i);
if (container.getStatus() == Constants.CAS_PROCESSOR_DISABLED) {
continue; // skip over disabled CasProcessors
CasProcessor processor = container.getCasProcessor();
if (processor instanceof CasObjectProcessor) {
((CasObjectProcessor) processor).typeSystemInit(cas.getTypeSystem());
} catch (ResourceInitializationException e) {
throw e;
} catch (Exception e) {
throw new ResourceInitializationException(e);
} finally {
synchronized (casPool) {
* Stops All Cas Processors and optionally changes the status according to kill flag
* @param -
* kill - true if CPE has been stopped before completing normally
public void stopCasProcessors(boolean kill) throws CasProcessorDeploymentException {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_stop_containers__FINEST",
new Object[] { Thread.currentThread().getName() });
// Stop all running CASProcessors
for (int i = 0; annotatorList != null && i < annotatorList.size(); i++) {
ProcessingContainer container = (ProcessingContainer) annotatorList.get(i);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(), container.getName(),
String.valueOf(container.getTotalTime()) });
synchronized (container) {
// CasProcessor processor = container.getCasProcessor();
// Change the status of this container to KILLED if the CPM has been stopped
// before completing the collection and current status of CasProcessor is
// either READY or RUNNING
if (kill || (stopped && isProcessorReady(container.getStatus()))) {
} else {
// If the CasProcessor has not been disabled during processing change its
// status to COMPLETED.
if (container.getStatus() != Constants.CAS_PROCESSOR_DISABLED) {
saveStat("ProcessorStatus", String.valueOf(container.getStatus()), container);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_stop_container__FINEST",
new Object[] { Thread.currentThread().getName(), container.getName() });
CasProcessorDeployer deployer = container.getDeployer();
if (deployer != null)
// Destroy Cas Consumers
for (int i = 0; consumerList != null && i < consumerList.size(); i++) {
ProcessingContainer container = (ProcessingContainer) consumerList.get(i);
* Returns collectionReader progress.
public Progress[] getProgress() {
if (collectionReader == null) {
return null;
return collectionReader.getProgress();
private HashMap getStatForContainer(ProcessingContainer aContainer) {
HashMap cpStatMap = null;
if (stats != null && (cpStatMap = (HashMap) stats.get(aContainer.getName())) != null) {
return cpStatMap;
return null;
private void saveStat(String aStatLabel, String aStatValue, ProcessingContainer aContainer) {
HashMap cpStatMap = getStatForContainer(aContainer);
if (cpStatMap != null) {
cpStatMap.put(aStatLabel, aStatValue);
* Check if the CASProcessor status is available for processing
private boolean isProcessorReady(int aStatus) {
if (aStatus == Constants.CAS_PROCESSOR_READY || aStatus == Constants.CAS_PROCESSOR_RUNNING) {
return true;
return false;
public void invalidateCASes(CAS[] aCASList) {
if (producer != null) {
} else {
ChunkMetadata meta = CPMUtils.getChunkMetadata(aCASList[0]);
if (meta != null && meta.isOneOfMany() && skippedDocs.containsKey(meta.getDocId()) == false) {
skippedDocs.put(meta.getDocId(), meta.getDocId());
if (outputQueue != null) {
* Releases given cases back to pool.
* @param aCASList -
* cas list to release
public void releaseCASes(CAS[] aCASList) {
for (int i = 0; i < aCASList.length; i++) {
if (aCASList[i] != null) {
// aCASList[i].reset();
synchronized (casPool) {
} else {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_release_tcas__FINEST",
new Object[] { Thread.currentThread().getName() });
* Overrides the default performance tuning settings for this CPE. This affects things such as CAS
* sizing parameters.
* @param aPerformanceTuningSettings
* the new settings
* @see UIMAFramework#getDefaultPerformanceTuningSettings()
public void setPerformanceTuningSettings(Properties aPerformanceTuningSettings) {
mPerformanceTuningSettings = aPerformanceTuningSettings;
* @return Returns the PerformanceTuningSettings.
public Properties getPerformanceTuningSettings() {
return mPerformanceTuningSettings;
* @param aPca
public void setProcessControllerAdapter(ProcessControllerAdapter aPca) {
pca = aPca;
* Return CPE Configuration params. Limit access to classes in the same package
protected CpeConfiguration getCpeConfig() throws Exception {
return cpeFactory.getCPEConfig();
* Called from the ProcessingUnits when they shutdown due to having received the EOFToken. When
* all ProcessingUnits have shut down, we put an EOFToken on the output queue so that The CAS
* Consumers will also shut down. -Adam
synchronized void processingUnitShutdown(ProcessingUnit unit) {
if (activeProcessingUnits == 0 && outputQueue != null) {
Object[] eofToken = new Object[1];
eofToken[0] = new EOFToken();
synchronized (outputQueue) {
public boolean dropCasOnException() {
return dropCasOnExceptionPolicy;
private Object getCasWithSOFA(Object entity, ProcessTrace pTrTemp) {
CAS[] casList = new CAS[1];
// CasObject based CollectionReader does not support returning more than one CAS at a time. So
// fake support for this by calling its getNext() until the casList is filled to max capacity.
// The capacity of casList is equal to the CollectionReader fetchSize, defined in CR descriptor.
try {
if (collectionReader instanceof CollectionReader) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_call_get_cas__FINEST",
new Object[] { Thread.currentThread().getName() });
if (entity != null && entity instanceof CAS[]) {
casList = (CAS[]) entity;
} else {
readerState = 1001;
while (this.isRunning && (casList[0] = casPool.getCas(0)) == null)
; // intentionally empty while loop
entity = casList;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
new Object[] { Thread.currentThread().getName(),
String.valueOf((casList[0] == null)) });
if (this.isRunning() == false) {
readerState = 1009;
synchronized (casPool) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
readerState = 1010;
return null;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName() });
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_call_cas_reset__FINEST",
new Object[] { Thread.currentThread().getName() });
boolean sofaUnaware = needsView();
readerState = 1003;
long st00 = System.currentTimeMillis();
if (sofaUnaware) {
// sofa-unaware style CR, give it the initial view
CAS view = casList[0].getView(CAS.NAME_DEFAULT_SOFA);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_call_cr_next__FINEST",
new Object[] { Thread.currentThread().getName(), "TCAS" });
((CollectionReader) collectionReader).getNext(view);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), "TCAS" });
} else // sofa-aware CR, give it the base CAS
CAS baseCas = ((CASImpl) casList[0]).getBaseCAS();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_call_cr_next__FINEST",
new Object[] { Thread.currentThread().getName(), "CAS" });
((CollectionReader) collectionReader).getNext(baseCas);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
new Object[] { Thread.currentThread().getName(), "CAS" });
crFetchTime += (System.currentTimeMillis() - st00);
entity = casList;
return entity;
} catch (Exception e) {
if (UIMAFramework.getLogger().isLoggable(Level.FINER)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
Thread.currentThread().getName() + "" + e);
handleException(e, casList, pTrTemp);
return null;
* @return
private boolean needsView() {
if (definedCapabilities == null) {
// If Collection Reader and CAS Initilaizer do not declare any output SofAs,
// must be sent the default view (meaning whatever's mapped to _InitialView)
// for backward compatiblity
CasInitializer casIni = ((CollectionReader) collectionReader).getCasInitializer();
if (casIni != null) {
definedCapabilities = casIni.getProcessingResourceMetaData().getCapabilities();
} else {
definedCapabilities = ((CollectionReader) collectionReader).getProcessingResourceMetaData()
for (int j = 0; j < definedCapabilities.length; j++) {
if (definedCapabilities[j].getOutputSofas().length > 0) {
needsTCas = false;
return needsTCas;
* Initialize the CPE
* @throws Exception
private void bootstrapCPE() throws Exception {
casPool = new CPECasPool(getPoolSize(), cpeFactory.getResourceManager().getCasManager(), mPerformanceTuningSettings);
* Setup single threaded pipeline
* @throws Exception
private void setupProcessingPipeline() throws Exception {
// activeProcessingUnits = 1;
nonThreadedProcessingUnit = new NonThreadedProcessingUnit(this);
// Assign initial status to all Cas Processors in the processing pipeline
for (int i = 0; i < annotatorList.size(); i++) {
((ProcessingContainer) annotatorList.get(i)).setStatus(Constants.CAS_PROCESSOR_RUNNING);
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null)
* Setup Cas Consumer pipeline as single threaded
* @throws Exception
private void setupConsumerPipeline() throws Exception {
if (consumerList != null && consumerList.size() > 0) {
nonThreadedCasConsumerProcessingUnit = new NonThreadedProcessingUnit(this);
// Assign initial status to all Cas Processors in the processing pipeline
for (int i = 0; i < consumerList.size(); i++) {
((ProcessingContainer) consumerList.get(i)).setStatus(Constants.CAS_PROCESSOR_RUNNING);
// Add Callback Listeners
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null)
// Notify Callback Listeners when done processing entity
// Add custom timer
try {
} catch (Exception e) {
// Use default Timer
nonThreadedCasConsumerProcessingUnit.setUimaTimer(new JavaTimer());
* Determines if a given CAS should be skipped
* @param entity -
* container for CAS
* @return
private boolean skipDroppedDocument(Object[] entity) {
if (entity instanceof CAS[]) {
ChunkMetadata meta = CPMUtils.getChunkMetadata((CAS) entity[0]);
if (meta != null && skippedDocs.containsKey(meta.getDocId())) {
return true;
return false;
* Runs the CPE in a single thread without queues.
* @throws Exception
public void runSingleThreaded() throws Exception {
Object entity = null;
isRunning = true;
ProcessTrace pTrTemp = getProcessTrace();
boolean success = true;
long entityCount = 0;
// long start = System.currentTimeMillis();
long aggTime = 0;
long ppTime = 0;
long ccTime = 0;
long crTime = 0;
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null)
while (isRunning) {
try {
// Check if processed all entities as defined in the Cpe Descriptor.
if (endOfProcessingReached(entityCount)) {
waitForCpmToResumeIfPaused(); // blocks if CPM is paused
// check again the state of the cpm after pause
if (!isRunning)
readerState = 1000;
if (!collectionReader.hasNext())
long st0 = System.currentTimeMillis();
entity = getCasWithSOFA(entity, pTrTemp);
crTime += (System.currentTimeMillis() - st0);
if (entity == null) {
success = false;
if (entity instanceof CAS[] && skipDroppedDocument((Object[]) entity)) {
notifyListeners(CAS_PROCESSED_MSG, (Object[]) entity, pTrTemp, new SkipCasException(
"Skipping Document Due To Dropped Cas in a Sequence"));
releaseCASes((CAS[]) entity);
} else {
// Clear the cache of bad documents
if (skippedDocs.size() > 0) {
long st1 = System.currentTimeMillis();
// If CAS has been dropped due to an exception dont call CasConsumer
success = nonThreadedProcessingUnit.analyze((Object[]) entity, pTrTemp);
ppTime += (System.currentTimeMillis() - st1);
if (success) {
long st2 = System.currentTimeMillis();
nonThreadedCasConsumerProcessingUnit.analyze((Object[]) entity, pTrTemp);
ccTime += (System.currentTimeMillis() - st2);
} catch (Throwable t) {
// may change the state of the isRunning on fatal exception
handleException(t, (Object[]) entity, pTrTemp);
success = false;
} finally {
// After sucessfull analysis notify listeners. If there was an exception, it has
// already been reported
if (success) {
readerState = 2007;
if (entity == null) {
notifyListeners(CAS_PROCESSED_MSG, null, pTrTemp);
} else {
notifyListeners(CAS_PROCESSED_MSG, (Object[]) entity, pTrTemp);
if (entity != null && entity instanceof CAS[]) {
releaseCASes((CAS[]) entity);
entity = null;
// Update processing trace counts and timers
synchronized (procTr) {
long st = System.currentTimeMillis();
aggTime += (System.currentTimeMillis() - st);
} // while
* Determines if the CPM processed all documents
* @param entityCount -
* number of documents processed so far
* @return true if all documents processed, false otherwise
private boolean endOfProcessingReached(long entityCount) {
// Special case, -1 means all entities in the corpus
if (numToProcess == -1) {
return false;
} else if (numToProcess == 0) {
return true;
} else {
// check if exceeded or matched the configured max number of entities
return (entityCount >= numToProcess);
* Handle given exception
* @param t -
* exception to handle
* @param entity -
* CAS container
* @param aPTrace -
* process trace
private void handleException(Throwable t, Object[] entity, ProcessTrace aPTrace) {
if (t instanceof AbortCPMException || t instanceof Error) {
isRunning = false;
killed = true;
notifyListeners(CAS_PROCESSED_MSG, entity, aPTrace, t);
* @param aMsgType
* @param entity
* @param aPTrace
private void notifyListeners(int aMsgType, Object[] entity, ProcessTrace aPTrace) {
notifyListeners(aMsgType, entity, aPTrace, null);
private void notifyListeners(int aMsgType, Object[] entity, ProcessTrace aPTrace, Throwable t) {
// Add Callback Listeners
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null) {
EntityProcessStatusImpl eps = new EntityProcessStatusImpl(aPTrace);
// eps = new EntityProcessStatusImpl(aPTrace);
if (entity == null) {
if (t != null) {
eps.addEventStatus("Process", "Failed", t);
((StatusCallbackListener) statCL).entityProcessComplete(null, eps);
} else {
for (int i = 0; i < entity.length; i++) {
if (t != null) {
eps.addEventStatus("Process", "Failed", t);
if (entity[i] != null && entity[i] instanceof CAS) {
callEntityProcessCompleteWithCAS((StatusCallbackListener)statCL, (CAS)entity[i], eps);
// ((StatusCallbackListener) statCL).entityProcessComplete((CAS) entity[i], eps);
} else {
((StatusCallbackListener) statCL).entityProcessComplete(null, eps);
* Internal use only, public for crss package access. switches class loaders and locks cas
* @param statCL status call back listener
* @param cas cas
* @param eps entity process status
public static void callEntityProcessCompleteWithCAS(StatusCallbackListener statCL, CAS cas, EntityProcessStatus eps) {
try {
if (null != cas)
statCL.entityProcessComplete(cas, eps);
} finally {
if (null != cas)
private ProcessTrace getProcessTrace() throws Exception {
ProcessTrace pT = null;
UimaTimer uTimer = getTimer();
if (uTimer != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_use_custom_timer__FINEST",
new Object[] { Thread.currentThread().getName(), uTimer.getClass().getName() });
pT = new ProcessTrace_impl(uTimer, this.getPerformanceTuningSettings());
} else {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_use_default_timer__FINEST",
new Object[] { Thread.currentThread().getName() });
pT = new ProcessTrace_impl(this.getPerformanceTuningSettings());
return pT;
* Stop and cleanup single-threaded CPE.
private void tearDownCPE() {
private void waitForCpmToResumeIfPaused() {
synchronized (lockForPause) {
// Pause this thread if CPM has been paused
while (pause) {
// threadState = 2016; thread state is not kept here, only in the ProcessingUnit
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_pausing_pp__FINEST",
new Object[] { Thread.currentThread().getName() });
try {
// Wait until resumed
} catch (InterruptedException e) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_resuming_pp__FINEST",
new Object[] { Thread.currentThread().getName() });