blob: 465385be50a65d904da58b5cc42fc1db256d3fee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.collection.impl.cpm.engine;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import org.apache.uima.UIMAFramework;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas_data.CasData;
import org.apache.uima.collection.CollectionException;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.collection.StatusCallbackListener;
import org.apache.uima.collection.base_cpm.AbortCPMException;
import org.apache.uima.collection.base_cpm.AbortCasProcessorException;
import org.apache.uima.collection.base_cpm.BaseStatusCallbackListener;
import org.apache.uima.collection.base_cpm.CasDataConsumer;
import org.apache.uima.collection.base_cpm.CasDataProcessor;
import org.apache.uima.collection.base_cpm.CasDataStatusCallbackListener;
import org.apache.uima.collection.base_cpm.CasObjectProcessor;
import org.apache.uima.collection.base_cpm.CasProcessor;
import org.apache.uima.collection.base_cpm.SkipCasException;
import org.apache.uima.collection.impl.CasConverter;
import org.apache.uima.collection.impl.EntityProcessStatusImpl;
import org.apache.uima.collection.impl.base_cpm.container.KillPipelineException;
import org.apache.uima.collection.impl.base_cpm.container.ProcessingContainer;
import org.apache.uima.collection.impl.base_cpm.container.ServiceConnectionException;
import org.apache.uima.collection.impl.base_cpm.container.deployer.CasProcessorDeployer;
import org.apache.uima.collection.impl.cpm.Constants;
import org.apache.uima.collection.impl.cpm.container.CasObjectNetworkCasProcessorImpl;
import org.apache.uima.collection.impl.cpm.container.NetworkCasProcessorImpl;
import org.apache.uima.collection.impl.cpm.container.ProcessingContainer_Impl;
import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
import org.apache.uima.collection.impl.cpm.utils.ChunkMetadata;
import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage;
import org.apache.uima.collection.impl.cpm.vinci.DATACasUtils;
import org.apache.uima.collection.metadata.CpeConfiguration;
import org.apache.uima.resource.ResourceConfigurationException;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.util.Level;
import org.apache.uima.util.ProcessTrace;
import org.apache.uima.util.UimaTimer;
import org.apache.uima.util.impl.ProcessTrace_impl;
/**
* This component executes the processing pipeline. Running in a seperate thread it continuously
* reads bundles of Cas from the Work Queue filled by {@link ArtifactProducer} and sends it through
* configured CasProcessors. The sequence in which CasProcessors are invoked is defined by the order
* of Cas Processor listing in the cpe descriptor. The results of analysis produced be Cas
* Processors is enqueued onto an output queue that is shared with Cas Consumers.
*
*
*/
public class ProcessingUnit extends Thread {
public int threadState = 0;
protected CPECasPool casPool;
protected boolean releaseCAS = false;
protected CPMEngine cpm = null;
protected BoundedWorkQueue workQueue = null;
protected BoundedWorkQueue outputQueue = null;
protected CasConverter mConverter;
protected ProcessTrace processingUnitProcessTrace;
protected LinkedList processContainers = new LinkedList();
protected long numToProcess = 0;
protected CAS[] casList;
protected ArrayList statusCbL = new ArrayList();
protected boolean notifyListeners = false;
protected CAS conversionCas = null;
protected Object[] artifact = null;
protected CAS[] conversionCasArray;
protected UimaTimer timer;
protected String threadId = null;
protected CpeConfiguration cpeConfiguration = null;
private CAS[] casCache = null;
private boolean isCasConsumerPipeline = false;
private boolean isRunning = false;
public long timer01 = 0;
public long timer02 = 0;
public long timer03 = 0;
public long timer04 = 0;
public long timer05 = 0;
public long timer06 = 0;
public ProcessingUnit() {
conversionCasArray = new CAS[1];
// Instantiate a class responsible for converting CasData to CasObject and vice versa
mConverter = new CasConverter();
}
/**
* Initialize the PU
*
* @param acpm -
* component managing life cycle of the CPE
* @param aInputQueue -
* queue to read from
* @param aOutputQueue -
* queue to write to
*/
public ProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue) {
cpm = acpm;
try {
cpeConfiguration = cpm.getCpeConfig();
} catch (Exception e) {
}
workQueue = aInputQueue;
conversionCasArray = new CAS[1];
// Instantiate a class responsible for converting CasData to CasObject and vice versa
mConverter = new CasConverter();
outputQueue = aOutputQueue;
maybeLogFinestWorkQueue("UIMA_CPM_initialize_pipeline__FINEST", workQueue);
}
public ProcessingUnit(CPMEngine acpm) {
cpm = acpm;
try {
cpeConfiguration = cpm.getCpeConfig();
} catch (Exception e) {
}
conversionCasArray = new CAS[1];
// Instantiate a class responsible for converting CasData to CasObject and vice versa
mConverter = new CasConverter();
}
/**
* Returns true if this component is in running state.
*
* @return - true if running, false otherwise
*/
public boolean isRunning() {
return isRunning;
}
/**
* Define a CasConsumer Pipeline identity for this instance
*/
public void setCasConsumerPipelineIdentity() {
isCasConsumerPipeline = true;
}
public boolean isCasConsumerPipeline() {
return isCasConsumerPipeline;
}
/**
* Alternative method of providing a queue from which this PU will read bundle of Cas
*
* @param aInputQueue -
* read queue
*/
public void setInputQueue(BoundedWorkQueue aInputQueue) {
workQueue = aInputQueue;
}
/**
* Alternative method of providing a queue where this PU will deposit results of analysis
*
* @param aOutputQueue -
* queue to write to
*/
public void setOutputQueue(BoundedWorkQueue aOutputQueue) {
outputQueue = aOutputQueue;
}
/**
* Alternative method of providing the reference to the component managing the lifecycle of the
* CPE
*
* @param acpm -
* reference to the contrlling engine
*/
public void setCPMEngine(CPMEngine acpm) {
cpm = acpm;
}
/**
* Null out fields of this object. Call this only when this object is no longer needed.
*/
public void cleanup() {
this.casPool = null;
this.cpm = null;
this.workQueue = null;
this.outputQueue = null;
this.mConverter = null;
this.processingUnitProcessTrace = null;
this.processContainers.clear();
this.processContainers = null;
this.casList = null;
this.conversionCas = null;
this.artifact = null;
this.statusCbL = null;
this.conversionCasArray = null;
}
/**
* Set a flag indicating if notifications should be made via configured Listeners
*
* @param aDoNotify -
* true if notification is required, false otherwise
*/
public void setNotifyListeners(boolean aDoNotify) {
notifyListeners = aDoNotify;
}
/**
* Plugs in Listener object used for notifications.
*
* @param aListener -
* {@link org.apache.uima.collection.base_cpm.BaseStatusCallbackListener} instance
*/
public void addStatusCallbackListener(BaseStatusCallbackListener aListener) {
statusCbL.add(aListener);
}
/**
* Returns list of listeners used by this PU for callbacks.
*
* @return - lif of {@link org.apache.uima.collection.base_cpm.BaseStatusCallbackListener}
* instances
*/
public ArrayList getCallbackListeners() {
return statusCbL;
}
/**
* Removes given listener from the list of listeners
*
* @param aListener -
* object to remove from the list
*/
public void removeStatusCallbackListener(BaseStatusCallbackListener aListener) {
statusCbL.remove(aListener);
}
/**
* Plugs in ProcessTrace object used to collect statistics
*
* @param aProcessingUnitProcessTrace -
* object to compile stats
*/
public void setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace) {
processingUnitProcessTrace = aProcessingUnitProcessTrace;
}
/**
* Plugs in custom timer used by the PU for getting time
*
* @param aTimer -
* custom timer to use
*/
public void setUimaTimer(UimaTimer aTimer) {
timer = aTimer;
maybeLogFinest("UIMA_CPM_timer_class__FINEST", timer.getClass().getName());
}
/**
* Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are
* called sequentially. Each Cas Processor is contained in the container that is managing errors,
* counts and totals, and restarts.
*
* @param processorList
* CASProcessor to be added to the processing pipeline
*/
public void setContainers(LinkedList processorList) {
processContainers = processorList;
}
/**
*
* Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled
* Cas Processor remains in the Processing Pipeline, however it is not used furing processing.
*
* @param aCasProcessorIndex -
* location in the pipeline of the Cas Processor to delete
*/
public void disableCasProcessor(int aCasProcessorIndex) {
if (aCasProcessorIndex < 0 || aCasProcessorIndex > processContainers.size()) {
return;
}
// Retrive container with a reference to the CasProcessor
ProcessingContainer pc = ((ProcessingContainer) processContainers.get(aCasProcessorIndex));
if (pc != null) {
pc.setStatus(Constants.CAS_PROCESSOR_DISABLED);
maybeLogFinest("UIMA_CPM_disabled_cp__FINEST", pc);
}
}
/**
*
* Alternative method to disable Cas Processor. Uses a name to locate it.
*
* @param aCasProcessorName -
* a name of the Cas Processor to disable
*/
public void disableCasProcessor(String aCasProcessorName) {
for (int i = 0; i < processContainers.size(); i++) {
ProcessingContainer pc = ((ProcessingContainer) processContainers.get(i));
if (pc.getName().equals(aCasProcessorName)) {
pc.setStatus(Constants.CAS_PROCESSOR_DISABLED);
maybeLogFinest("UIMA_CPM_disabled_cp__FINEST", pc);
}
}
}
/**
*
* Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to
* receive bundles of Cas.
*
* @param aCasProcessorName -
* name of the Cas Processor to enable
*/
public void enableCasProcessor(String aCasProcessorName) {
for (int i = 0; i < processContainers.size(); i++) {
ProcessingContainer pc = ((ProcessingContainer) processContainers.get(i));
if (pc.getName().equals(aCasProcessorName)) {
maybeLogFinest("UIMA_CPM_enabled_cp__FINEST", pc);
pc.setStatus(Constants.CAS_PROCESSOR_RUNNING);
}
}
}
/**
* Returns a {@link ProcessTrace} instance used by this component
*
* @return - ProcessTrace instance
*/
private ProcessTrace getProcessTrace() {
ProcessTrace pT = null;
if (timer != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_use_custom_timer__FINEST", timer.getClass().getName());
}
pT = new ProcessTrace_impl(timer, cpm.getPerformanceTuningSettings());
} else {
maybeLogFinest("UIMA_CPM_use_default_timer__FINEST");
pT = new ProcessTrace_impl(cpm.getPerformanceTuningSettings());
}
return pT;
}
/**
* Handles EOFToken. This object is received when the CPM terminates. This token is passed to each
* running processing thread and cas consumer thread to allow orderly shutdown. The EOFToken may
* be generated by ArtifactProducer if end of collection is reached, or the CPM itself can place
* it in the Work Queue to force all processing threads to stop.
*
* @throws Exception -
*/
private void handleEOFToken() throws Exception {
maybeLogFinest("UIMA_CPM_got_eof_token__FINEST");
// Add EOF Token back to the work queue so that the next processing thread (if there is one) can
// terminate. There will be more
// processing if the CPE is configured (via CPE descriptor) to run in multipipeline mode.
if (!isCasConsumerPipeline()) {
// Check if there are additional processing threads to stop
if (cpm.getThreadCount() > 1) {
// Put EOF Token back to queue to ensure that all PUs get it
workQueue.enqueue(artifact);
// synchronized (workQueue) { redundant - the above enqueue call does this
// workQueue.notifyAll();
// }
}
if (outputQueue != null) {
maybeLogFinest("UIMA_CPM_placed_eof_in_queue__FINEST", outputQueue.getName());
}
// Can't enquque EOFToken on output queue here, because if there are multiple processing
// pipelines then the CAS Consumers may get the EOFToken before they get the CASes that
// are being currently processed by the other pipelines. Instead, we inform the
// CPMEngine that we are shutting down this pipeline, and let it decide when to put the
// EOFToken on the output queue. -Adam
// outputQueue.enqueue(artifact);
cpm.processingUnitShutdown(this);
}
}
/**
* Release CAS back to the CAS Pool. This method is only used when chunk-aware queue is used. When
* a document is chunked each chunk represents a portion of the document. These chunks are
* ingested in sequential order by the Cas Consumer. The delivery of chunks in the correct
* sequence ( chunk seg 1 before chunk sequence 2) is guaranteed. Since chunks are processed
* asynchronously ( if multi pipeline configuration is used), they may arrive in the queue out of
* sequence. If this happens the Cas Consumer will wait for an expected chunk sequence. If such
* chunk does not arrive in configured interval the entire sequence ( all related chunks (CASes) )
* are invalidated. Invalidated in the sense that they are marked as timed out. Each CAS will be
* released back to the CAS Pool.
*
* @param artifact -
* an array of CAS instances
*/
private void releaseTimedOutCases(Object[] artifact) {
for (int j = 0; j < artifact.length; j++) {
if (artifact[j] != null) {
// Release CASes that timed out back to the pool
casPool.releaseCas((CAS) artifact[j]);
// synchronized (casPool) { // redundant - the above releaseCas call does this
// casPool.notifyAll();
// }
artifact[j] = null;
}
}
}
/**
*
*
*/
private void isCpmPaused() {
synchronized (cpm.lockForPause) {
// Pause this thread if CPM has been paused
while (cpm.isPaused()) {
threadState = 2016;
maybeLogFinest("UIMA_CPM_pausing_pp__FINEST");
try {
// Wait until resumed
cpm.lockForPause.wait();
} catch (InterruptedException e) {
}
maybeLogFinest("UIMA_CPM_resuming_pp__FINEST");
}
}
}
/**
* Starts the Processing Pipeline thread. This thread waits for an artifact to arrive on
* configured Work Queue. Once the CAS arrives, it is removed from the queue and sent through the
* analysis pipeline.
*/
public void run() {
if (!cpm.isRunning()) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cpm_not_running__WARNING",
new Object[] { Thread.currentThread().getName() });
return;
}
maybeLogFinestWorkQueue("UIMA_CPM_start_pp__FINEST", workQueue);
// Assign initial status to all Cas Processors in the processing pipeline
for (int i = 0; i < processContainers.size(); i++) {
((ProcessingContainer) processContainers.get(i)).setStatus(Constants.CAS_PROCESSOR_RUNNING);
}
// Continue until CPE is stopped
boolean run = true;
int maxWaitTimeForEntity = 0;
if (cpeConfiguration != null && cpeConfiguration.getMaxTimeToWait() > 0) {
maxWaitTimeForEntity = cpeConfiguration.getMaxTimeToWait();
}
isRunning = true;
while (run) {
threadState = 2000; // Start the Loop
// blocks if CPM is in pause state
isCpmPaused();
maybeLogFinestWorkQueue("UIMA_CPM_dequeue_artifact__FINEST", workQueue);
artifact = null;
Object entity = null;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_maxwait_for_artifact__FINEST", String.valueOf(maxWaitTimeForEntity));
}
threadState = 2001; // Entering dequeue()
// D E Q U E U E *************************************
if (maxWaitTimeForEntity > 0) {
entity = workQueue.dequeue(maxWaitTimeForEntity);
} else {
entity = workQueue.dequeue(0);
}
if (entity == null) {
maybeLogFinest("UIMA_CPM_queue_empty__FINEST", workQueue.getName());
continue;
}
try {
if (entity instanceof WorkUnit) {
artifact = (Object[]) ((WorkUnit) entity).get();
if (((WorkUnit) entity).isTimedOut() && artifact instanceof CAS[]) {
for (int i = 0; i < artifact.length && artifact[i] != null; i++) {
ChunkMetadata meta = CPMUtils.getChunkMetadata((CAS) artifact[i]);
if (meta != null) {
EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(
processingUnitProcessTrace);
enProcSt.addEventStatus("Process", "Failed", new SkipCasException(
"Dropping CAS due chunk Timeout. Doc Id::" + meta.getDocId() + " Sequence:"
+ meta.getSequence()));
doNotifyListeners(artifact[i], true, enProcSt);
} else {
EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(
processingUnitProcessTrace);
enProcSt.addEventStatus("Process", "Failed", new SkipCasException(
"Dropping CAS due chunk Timeout. Chunk Metadata is not available."));
doNotifyListeners(artifact[i], true, enProcSt);
}
releaseTimedOutCases(artifact);
}
continue;
}
if (((WorkUnit) entity).getCas() != null) {
casCache = ((WorkUnit) entity).getCas();
}
} else {
artifact = (Object[]) entity;
}
ProcessTrace pT = getProcessTrace();
Object[] cases = artifact;
// Check if the artifact is actually an EOFToken. If so, this is
// marker that indicates end of processing. The assumption is that
// the will not be anything enqueued after the EOFToken.
if (cases.length > 0 && cases[0] instanceof EOFToken) {
threadState = 2002; // End
run = false;
handleEOFToken();
break; // Terminate Loop
}
maybeLogFinest("UIMA_CPM_call_processNext__FINEST");
/* *********** EXECUTE PIPELINE ************ */
processNext(artifact, pT);
if (System.getProperty("DEBUG_EVENTS") != null) {
maybeLogFinest("UIMA_CPM_dump_events__FINEST");
CPMUtils.dumpEvents(pT);
}
// Update processing trace counts and timers
synchronized (processingUnitProcessTrace) {
processingUnitProcessTrace.aggregate(pT);
}
} catch (ResourceProcessException e) {
maybeLogSevereException(e);
if (e.getCause() instanceof KillPipelineException) {
cpm.pipelineKilled(Thread.currentThread().getName());
releaseCAS = true;
break; // terminate the thread
}
threadState = 2003; // Killing
this.cpm.killIt();
} catch (Exception e) {
maybeLogSevereException(e);
threadState = 2003; // Killing
this.cpm.killIt();
} finally {
if (releaseCAS) {
clearCasCache();
}
}
}
maybeLogFinestWorkQueue("UIMA_CPM_exit_pp__FINEST", workQueue);
// Always clear the cas cache on exit.
clearCasCache();
maybeLogFinest("UIMA_CPM_pp_terminated__FINEST");
isRunning = false;
}
/**
* Releases all CAS instances from the Cache back to the Cas Pool. Cas Cache is used as
* optimization to store CAS in case it is needed for conversion. Specifically, in configurations
* that use XCAS and CAS based AEs.
*
*/
private void clearCasCache() {
if (casCache != null) {
for (int index = 0; index < casCache.length; index++) {
if (casCache[index] != null) {
// casCache[index].reset();
maybeLogFinest("UIMA_CPM_release_cas_from_cache__FINEST");
casPool.releaseCas(casCache[index]);
// synchronized (casPool) { // redundant - the above releaseCas call does this
// casPool.notifyAll();
// }
maybeLogFinest("UIMA_CPM_release_cas_from_cache_done__FINEST");
}
}
casCache = null;
}
}
/**
* Consumes the input queue to make sure all bundles still there get processede before CPE
* terminates.
* @return -
*/
public boolean consumeQueue() {
Object artifact = null;
maybeLogFinestWorkQueue("UIMA_CPM_dequeue_artifact__FINEST", workQueue);
// Dequeue first bundle
artifact = workQueue.dequeue();
maybeLogFinest("UIMA_CPM_dequeued_artifact__FINEST", workQueue.getName());
if (artifact != null) {
try {
ProcessTrace pT = new ProcessTrace_impl(cpm.getPerformanceTuningSettings());
if (artifact instanceof Object[]) {
Object[] oList = (Object[]) artifact;
// Only consume CASs
if (oList[0] != null && !(oList[0] instanceof EOFToken)) {
maybeLogFinest("UIMA_CPM_call_processNext__FINEST");
processNext((Object[]) artifact, pT);
maybeLogFinest("UIMA_CPM_call_processNext_done__FINEST");
synchronized (processingUnitProcessTrace) {
processingUnitProcessTrace.aggregate(pT);
}
return true;
}
}
} catch (Exception e) {
maybeLogSevereException(e);
}
}
return false;
}
/**
* Executes the processing pipeline. Given bundle of Cas instances is processed by each Cas
* Processor in the pipeline. Conversions between different types of Cas Processors is done on the
* fly. Two types of Cas Processors are currently supported:
*
* <ul>
* <li> CasDataProcessor</li>
* <li> CasObjectProcessor</li>
* </ul>
*
* The first operates on instances of CasData the latter operates on instances of CAS. The results
* produced by Cas Processors are added to the output queue.
*
* @param aCasObjectList - bundle of Cas to analyze
* @param pTrTemp - object used to aggregate stats
* @return -
* @throws AbortCPMException -
* @throws ResourceProcessException -
* @throws CollectionException -
* @throws IOException -
* @throws KillPipelineException -
*/
protected boolean processNext(Object[] aCasObjectList, ProcessTrace pTrTemp)
throws ResourceProcessException, IOException, CollectionException, AbortCPMException,
KillPipelineException {
maybeLogFinest("UIMA_CPM_start_analysis__FINEST");
// String lastDocId = "";
CasProcessor processor = null;
// This is used to hold an index of the current CasObject
// int currentIndex = -1;
boolean doneAlready = false;
// If there are no CASes in the list, return false since there is nothing else to do
if (aCasObjectList == null || aCasObjectList[0] == null) {
maybeLogFinest("UIMA_CPM_invalid_cas_reference__SEVERE");
return false;
}
Object[] casObjects = null;
// Determine if the Cas'es contained in the CasList are of type CAS. Samples the first CAS in
// the list.
// The list contains CASes of the same type ( either CasData or CAS ). Mixed model not
// supported.
boolean isCasObject = aCasObjectList[0] instanceof CAS;
// String docid = "";
maybeLogFinest("UIMA_CPM_entering_pipeline__FINEST");
ProcessingContainer container = null;
// *******************************************
// ** P R O C E S S I N G P I P E L I N E **
// *******************************************
// Send Cas Object through the processing pipeline.
for (int i = 0; processContainers != null && i < processContainers.size(); i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_retrieve_container__FINEST", String.valueOf(i));
}
container = (ProcessingContainer) processContainers.get(i);
synchronized (container) {
// Check to see if the CasProcessor is available for processing
if (!isProcessorReady(container.getStatus())) {
maybeLogFinest("UIMA_CPM_container_not_ready__FINEST", container);
boolean breakTheLoop = (i == (processContainers.size() - 1));
if (breakTheLoop && isCasObject) {
releaseCases(aCasObjectList, true, container.getName());
break;
}
// Skip any CasProcessor that is not ready to process
continue;
}
}
// Check if any of the Cas'es in the set has a required feature structure.
if (!isCasObject && !container.processCas(aCasObjectList)) {
maybeLogFinest("UIMA_CPM_skip_CAS__FINEST", container);
container.incrementFilteredCount(aCasObjectList.length);
container.logAbortedCases(aCasObjectList);
continue;
}
long byteCount;
// Flag controlling do-while loop that facilitates retries. Retries are defined in the
// CasProcessor configuration.
boolean retry = false;
// Retry Loop.
do {
if (System.getProperty("SHOW_MEMORY") != null) {
maybeLogMemoryFinest();
}
maybeLogFinest("UIMA_CPM_checkout_cp_from_container__FINEST", container);
threadState = 2004; // Entering dequeue()
processor = container.getCasProcessor();
if (processor == null) {
maybeLogSevere("UIMA_CPM_checkout_null_cp_from_container__SEVERE", container.getName());
throw new ResourceProcessException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_EXP_invalid_component_reference__WARNING", new Object[] {
Thread.currentThread().getName(), "CasProcessor", "NULL" }), null);
}
// Check to see if the CasProcessor is available for processing
// Container may have been disabled by another thread, so first check
if (!isProcessorReady(container.getStatus())) {
maybeLogFinest("UIMA_CPM_container_not_ready__FINEST", container);
if (container.getStatus() == Constants.CAS_PROCESSOR_KILLED) {
container.releaseCasProcessor(processor);
// Another thread has initiated CPM Abort. That Thread has already notified
// the application of the Abort. Here we just return as the CPM has been
// killed most likely due to excessive errors.
return false;
}
// Skip any CasProcessor that is not ready to process
break;
}
maybeLogFinest("UIMA_CPM_checkedout_cp_from_container__FINEST", container, processor);
try {
if (processor instanceof CasDataProcessor) {
maybeLogFinest("UIMA_CPM_cas_data_processor__FINEST", container, processor);
pTrTemp.startEvent(container.getName(), "Process", "");
if (isCasObject == true) {
CasData[] casDataObjects = new CasData[aCasObjectList.length];
for (int casIndex = 0; casIndex < aCasObjectList.length; casIndex++) {
casDataObjects[casIndex] = mConverter
.casContainerToCasData((CAS) aCasObjectList[casIndex]);
if ((CAS) aCasObjectList[casIndex] != null) {
((CAS) aCasObjectList[casIndex]).reset();
}
}
casCache = (CAS[]) aCasObjectList;
aCasObjectList = casDataObjects;
}
isCasObject = false;
byteCount = 0;
if (!retry) {
for (int casIndex = 0; casIndex < aCasObjectList.length; casIndex++) {
byteCount = getBytes(aCasObjectList[casIndex]);
container.addBytesIn(byteCount);
}
}
casObjects = aCasObjectList;
long pStart = System.currentTimeMillis();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_call_process__FINEST", container, processor);
logFinest("UIMA_CPM_casObjects_class__FINEST", casObjects.getClass().getName());
}
if (!(casObjects instanceof CasData[])) {
maybeLogFinest("UIMA_CPM_expected_casdata__FINEST", casObjects.getClass().getName());
}
maybeLogFinest("UIMA_CPM_call_process__FINEST", container, processor);
casObjects = ((CasDataProcessor) processor).process((CasData[]) casObjects);
maybeLogFinest("UIMA_CPM_call_process_completed__FINEST", container, processor);
long pEnd = System.currentTimeMillis();
container.incrementTotalTime((pEnd - pStart));
if (casObjects != null) {
if (processor instanceof CasDataConsumer) {
container.addBytesOut(byteCount);
} else {
aCasObjectList = casObjects;
if (!retry) {
for (int casIndex = 0; casIndex < aCasObjectList.length; casIndex++) {
byteCount = getBytes(aCasObjectList[casIndex]);
container.addBytesOut(byteCount);
}
}
}
}
pTrTemp.endEvent(container.getName(), "Process", "success");
} else if (processor instanceof CasObjectProcessor) {
maybeLogFinest("UIMA_CPM_casobject_processor__FINEST", container, processor);
maybeLogMemoryFinest();
casList = new CAS[aCasObjectList.length];
for (int casIndex = 0; casIndex < aCasObjectList.length; casIndex++) {
maybeLogFinest("UIMA_CPM_initialize_cas__FINEST", container);
if (aCasObjectList[casIndex] == null) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
logSevere("UIMA_CPM_casobjectlist_is_null__SEVERE",
container.getName(), String.valueOf(casIndex));
}
break;
}
if (isCasObject == false) {
// The following may be true if the CollectionReader is CasData based and this is
// the first CasObject based annotator in the chain.
if (casCache == null || casCache[casIndex] == null) {
casList[casIndex] = null;
while (casList[casIndex] == null) {
maybeLogFinest("UIMA_CPM_get_cas_from_pool__FINEST", container);
// Retrieve a Cas from Cas Pool. Wait max 10 millis for an instance
casList[casIndex] = casPool.getCas(0);
maybeLogFinest("UIMA_CPM_got_cas_from_pool__FINEST", container);
}
if (casList[casIndex] != null) {
maybeLogFinest("UIMA_CPM_call_cas_reset__FINEST", container);
casList[casIndex].reset();
}
} else {
casList[casIndex] = casCache[casIndex];
casList[casIndex].reset();
maybeLogFinest("UIMA_CPM_nullify_cas__FINEST", container);
// Cas is used up
casCache[casIndex] = null;
}
// Convert CasData to CAS
mConverter.casDataToCasContainer((CasData) aCasObjectList[casIndex],
casList[casIndex], true);
} else {
casList[casIndex] = (CAS) aCasObjectList[casIndex];
}
// Set the type from CasData to CasObject. When an error occurs in the proces()
// we need to know what type of object we deal with.
isCasObject = true;
aCasObjectList = casList;
if (processor instanceof AnalysisEngine) {
maybeLogFinest("UIMA_CPM_call_process__FINEST", container, processor);
threadState = 2005;
pTrTemp.aggregate(((AnalysisEngine) processor).process(casList[casIndex]));
maybeLogFinest("UIMA_CPM_call_process_completed__FINEST", container, processor);
} else {
pTrTemp.startEvent(container.getName(), "Process", "");
threadState = 2006;
maybeLogFinest("UIMA_CPM_call_process__FINEST", container, processor);
((CasObjectProcessor) processor).processCas(casList[casIndex]);
maybeLogFinest("UIMA_CPM_call_process_completed__FINEST", container, processor);
pTrTemp.endEvent(container.getName(), "Process", "success");
}
}
}
// Release the CAS and notify listeners if the end of the
// pipeline is reached.
if ((releaseCAS) && (i == (processContainers.size() - 1))) {
// This flag is used to prevent multiple notifications
doneAlready = true;
EntityProcessStatus aEntityProcStatus = new EntityProcessStatusImpl(pTrTemp);
maybeLogFinest("UIMA_CPM_notify_listeners__FINEST");
threadState = 2007;
notifyListeners(aCasObjectList, isCasObject, aEntityProcStatus);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_done_notify_listeners__FINEST");
logFinest("UIMA_CPM_releasing_cases__FINEST",
container.getName(), String.valueOf(releaseCAS), "true");
}
if (casCache != null) {
clearCasCache();
}
// Release CAS's.
if (aCasObjectList instanceof CAS[]) {
cpm.releaseCASes((CAS[]) aCasObjectList);
}
maybeLogFinest("UIMA_CPM_done_releasing_cases__FINEST", container);
}
maybeLogFinest("UIMA_CPM_pipeline_completed__FINEST");
retry = false;
// On successfull processing reset the restart counter. Restart counter determines how
// many times to restart Cas Processor on the same CAS
// Do this conditionally. If the CAS is to be dropped on Exception this restart counter
// scope extends to the entire collection not just one CAS
if (!cpm.dropCasOnException()) {
container.resetRestartCount();
}
} catch (Exception e) {
e.printStackTrace();
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
logSevere("UIMA_CPM_pipeline_exception__SEVERE", container.getName(), e.getMessage());
maybeLogSevereException(e);
logFinest("UIMA_CPM_pipeline_exception__FINEST",
container.getName(), String.valueOf(container.isPaused()));
}
EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(pTrTemp);
enProcSt.addEventStatus("Process", "Failed", e);
threadState = 2008;
notifyListeners(aCasObjectList, isCasObject, enProcSt);
doneAlready = true;
threadState = 2009;
// Check the policy to determine what to do with the CAS on exception. Return the CAS back
// to the pool
// and stop the processing chain if required. The policy for what to do with the CAS on
// exception is
// defined in the CPE descriptor
if (cpm.dropCasOnException()) {
if (casCache != null) {
clearCasCache();
}
if (aCasObjectList instanceof CAS[]) {
cpm.invalidateCASes((CAS[]) aCasObjectList);
}
retry = false; // Dont retry. The CAS has been released
maybeLogWarning("UIMA_CPM_drop_cas__WARNING",
container.getName(), processor.getClass().getName());
} else {
retry = true; // default on Exception
}
// If the container is in pause state dont increment errors since one thread has already
// done this. While the container is in pause state the CPM is attempting to re-connect
// to a failed service. Once that is done, the container is going to be resumed. While
// in pause state ALL threads using the container will be suspended.
if (processor instanceof CasObjectNetworkCasProcessorImpl && container.isPaused()) {
threadState = 2010;
maybeLogFinest("UIMA_CPM_container_paused_do_retry__FINEST", container);
// Do conditional release of CAS instances. The release occurs only if the CasProcessor
// is the last one
// in processing chain. This only releases instances of CAS checked out from the Cas
// Pool. If not done, the
// pool gets depleted and no more work will be done.
releaseCases(casList, (i == (processContainers.size() - 1)), container.getName());
maybeLogFinest("UIMA_CPM_container_paused__FINEST", container);
// Release current Cas Processor before continuing with the next Cas Processor in the
// pipeline
if (processor != null) {
container.releaseCasProcessor(processor);
maybeLogFinest("UIMA_CPM_ok_released_cp__FINEST", container);
processor = null;
}
try {
pTrTemp.endEvent(container.getName(), "Process", "failed");
} catch (Exception exc) {
// Just ignore out-of-phase endEvent exceptions for now.
}
continue;
}
if (pauseContainer(container, e, threadId)) // container.isRemote() &&
// container.isSingleFencedService() &&
// threadId == null )
{
threadState = 2011;
// Pause the container while the CPM is re-connecting to un-managed service
// that is shared by all processing threads
container.pause();
maybeLogFinest("UIMA_CPM_pausing_container__FINEST", container);
threadId = Thread.currentThread().getName();
}
if (processor instanceof CasDataProcessor
|| (processor instanceof CasObjectProcessor && !(processor instanceof AnalysisEngine))) {
try {
pTrTemp.endEvent(container.getName(), "Process", "failed");
} catch (Exception exc) {
// Just ignore out-of-phase endEvent exceptions for now.
}
}
try {
// Increments error counter and determines if any threshold have been reached. If
// the max error rate is reached, the CasProcessor can be configured as follows:
// - terminates CPM when threshold is reached ( method below throws AbortCPMException)
// - disables CasProcessor ( method below throws AbortCasProcessorException )
// - continue, CasProcessor continues to run dispite error
container.incrementCasProcessorErrors(e);
container.releaseCasProcessor(processor);
processor = null;
if (cpm.dropCasOnException()) {
// Cas has already been returned to the CAS pool. The policy requires to stop the
// processing chain for this CAS and
// to get another CAS for processing.
return true;
} else {
container.incrementRetryCount(1);
continue;
}
} // check if the exception should terminate the CPM
catch (KillPipelineException ex) {
try {
handleKillPipeline(container);
processor = null;
} catch (Exception innerE) {
maybeLogWarning("UIMA_CPM_exception_on_pipeline_kill__WARNING",
container.getName(), innerE.getMessage());
}
// finally
// {
// // Throw Original Exception - Killing Pipeline
// throw ex;
// }
throw ex;
} catch (AbortCPMException ex) {
try {
handleAbortCPM(container, processor);
} catch (Exception innerE) {
maybeLogWarning("UIMA_CPM_exception_on_cpm_kill__WARNING",
container.getName(), innerE.getMessage());
}
// finally
// {
// throw new AbortCPMException("Aborting CPM. CasProcessor::" + container.getName() + "
// Configured to Abort the CPM.");
// }
throw ex;
} // check if the CasProcessor is to be disabled due to excessive errors
catch (AbortCasProcessorException ex) {
try {
handleAbortCasProcessor(container, processor);
if (cpm.dropCasOnException()) {
// Cas has already been returned to the CAS pool. The policy requires to stop the
// processing chain for this CAS and
// to get another CAS for processing.
return true;
} else {
// Do conditional release of CAS instances. The release occurs only if the
// CasProcessor is the last one
// in processing chain. This only releases instances of CAS checked out from the Cas
// Pool. If not done, the
// pool gets depleted and no more work will be done.
releaseCases(casList, (i == (processContainers.size() - 1)), container.getName());
}
} catch (ResourceProcessException rpe) {
throw rpe;
} catch (Exception rpe) {
throw new ResourceProcessException(rpe);
}
break; // CasProcessor disabled move on to the next one
} // check if need to redeploy the CasProcessor
catch (ServiceConnectionException ex) {
pTrTemp.startEvent(container.getName(), "Process", "");
try {
threadState = 2012;
handleServiceException(container, processor, pTrTemp, ex);
// processor = null;
if (cpm.dropCasOnException()) {
return true;
} else {
// Increment number of restarts
container.incrementRestartCount(1);
pTrTemp.endEvent(container.getName(), "Process", "success");
continue; // retry the same CAS'es
}
} catch (ResourceProcessException rpe) {
pTrTemp.endEvent(container.getName(), "Process", "failure");
throw rpe;
} catch (ResourceConfigurationException rpe) {
if (rpe.getCause() != null && rpe.getCause() instanceof KillPipelineException) {
try {
handleKillPipeline(container);
processor = null;
} catch (Exception excep) {
// Just log the exception. We are killing the pipeline
maybeLogWarning("UIMA_CPM_exception_on_pipeline_kill__WARNING",
container.getName(), excep.getMessage());
}
}
pTrTemp.endEvent(container.getName(), "Process", "failure");
throw new ResourceProcessException(rpe.getCause());
} catch (Exception rpe) {
pTrTemp.endEvent(container.getName(), "Process", "failure");
throw new ResourceProcessException(rpe);
}
} catch (SkipCasException ex) {
try {
// Release current Cas Processor before continuing with the next Cas Processor in the
// pipeline
if (processor != null) {
container.releaseCasProcessor(processor);
}
handleSkipCasProcessor(container, aCasObjectList,
(i == (processContainers.size() - 1)));
} catch (Exception sEx) {
throw new ResourceProcessException(sEx);
}
processor = null;
if (cpm.dropCasOnException()) {
return true;
} else {
break;
}
} catch (Exception ex) {
if (UIMAFramework.getLogger().isLoggable(Level.FINER)) {
logCPM(Level.FINER, "UIMA_CPM_exception__FINER", new Object[] {ex.getMessage()});
ex.printStackTrace();
}
}
// CAS may have already been dropped (released) due to dropCasOnException policy defined
// in the CPE Descriptor
if (!cpm.dropCasOnException()) {
// Do conditional release of CAS instances. The release occurs only if the CasProcessor
// is the last one
// in processing chain. This only releases instances of CAS checked out from the Cas
// Pool. If not done, the
// pool gets depleted and no more work will be done.
releaseCases(casList, (i == (processContainers.size() - 1)), container.getName());
container.incrementRetryCount(1);
}
} // catch
// Let the container take action if the end-of-batch marker has been reached.
// End-of-batch marker is defined in the cpm configuration for every CasProcessor.
// This marker is defined in the <checkpoint> section of the CasProcessor Definition
// and corresponds to the attribute "batch". If end-of-batch marker is reached the container
// invokes batchProcessComplete() on the CasProcessor
maybeLogFinest("UIMA_CPM_end_of_batch__FINEST", container, processor);
doEndOfBatchProcessing(container, processor, pTrTemp, aCasObjectList);
processor = null;
} while (retry); // retry loop
if (processor != null) {
maybeLogFinest("UIMA_CPM_release_cp__FINEST", container, processor, casCache);
container.releaseCasProcessor(processor);
processor = null;
maybeLogFinest("UIMA_CPM_ok_release_cp__FINEST", container, processor, casCache);
}
} // end of: For All CasProcessors
try {
postAnalysis(aCasObjectList, isCasObject, casObjects, pTrTemp, doneAlready);
} catch (ResourceProcessException rpe) {
throw rpe;
} catch (Exception rpe) {
throw new ResourceProcessException(rpe);
}
maybeLogFinest("UIMA_CPM_pipeline_completed__FINEST");
return true;
}
/**
* Notifies application listeners of completed analysis and stores results of analysis (CAS) in
* the Output Queue that this thread shares with a Cas Consumer thread.
*
* @param aCasObjectList -
* List of Artifacts just analyzed
* @param isCasObject -
* determines the types of CAS just analyzed ( CasData vs CasObject)
* @param casObjects
* @param aProcessTr -
* ProcessTrace object holding events and stats
* @param doneAlready -
* flag to indicate if the last Cas Processor was released back to its container
*
* @throws Exception -
*/
private void postAnalysis(Object[] aCasObjectList, boolean isCasObject, Object[] casObjects,
ProcessTrace aProcessTr, boolean doneAlready) throws Exception {
try {
maybeLogFinest("UIMA_CPM_pipeline_completed__FINEST");
// Notify Listeners that the entity has been processed.
if (!doneAlready && notifyListeners) {
maybeLogFinest("UIMA_CPM_notify_listeners__FINEST");
threadState = 2013;
// Notif Listeners
EntityProcessStatus aEntityProcStatus = new EntityProcessStatusImpl(aProcessTr);
notifyListeners(aCasObjectList, isCasObject, aEntityProcStatus);
threadState = 2014;
maybeLogFinest("UIMA_CPM_done_notify_listeners__FINEST");
}
// enqueue CASes. If the CPM is in shutdown mode due to hard kill dont allow enqueue of CASes
if (outputQueue != null
&& (cpm.isRunning() == true || (cpm.isRunning() == false && cpm.isHardKilled() == false))) {
maybeLogFinestWorkQueue("UIMA_CPM_add_cas_to_queue__FINEST", outputQueue);
WorkUnit workUnit = new WorkUnit(aCasObjectList);
if (casCache != null && casCache[0] != null) {
workUnit.setCas(casCache);
}
threadState = 2015;
outputQueue.enqueue(workUnit);
casCache = null;
// synchronized (outputQueue) { // redundant - the above enqueue call does this
// outputQueue.notifyAll();
// }
}
return;
} catch (Exception e) {
throw e;
} finally {
if (outputQueue == null && casObjects != null && casObjects instanceof CasData[]) {
if (System.getProperty("DEBUG_RELEASE") != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_done_with_cas__FINEST", String.valueOf(Runtime.getRuntime().freeMemory() / 1024));
}
}
for (int i = 0; i < casObjects.length; i++) {
maybeLogFinest("UIMA_CPM_show_local_cache__FINEST", casCache);
casObjects[i] = null;
aCasObjectList[i] = null;
maybeLogFinest("UIMA_CPM_show_local_cache__FINEST", casCache);
}
if (System.getProperty("DEBUG_RELEASE") != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_show_total_memory__FINEST", String.valueOf(Runtime.getRuntime().freeMemory() / 1024));
}
}
}
}
}
/**
* Performs end of batch processing. It delegates the processing to Cas Processor container. The
* container using configuration determines if its time to call Cas Processor's
* batchProcessComplete() method.
*
* @param aContainer -
* container performing end of batch processing
* @param aProcessor -
* Cas Processor to call on end of batch
* @param aProcessTr -
* Process Trace to use for aggregating events
* @param aCasObjectList -
* CASes just analyzed
*/
private void doEndOfBatchProcessing(ProcessingContainer aContainer, CasProcessor aProcessor,
ProcessTrace aProcessTr, Object[] aCasObjectList) {
String cName = aContainer.getName();
try {
aProcessTr.startEvent(aContainer.getName(), "End of Batch", "");
aContainer.isEndOfBatch(aProcessor, aCasObjectList.length);
aProcessTr.endEvent(cName, "End of Batch", "success");
maybeLogFinest("UIMA_CPM_end_of_batch_completed__FINEST", aContainer);
} catch (Exception ex) {
maybeLogSevere("UIMA_CPM_end_of_batch_exception__SEVERE",
aContainer.getName(), ex.getMessage());
aProcessTr.endEvent(cName, "End of Batch", "failed");
} finally {
// Release current Cas Processor before continuing with the next Cas Processor in the pipeline
if (aProcessor != null) {
aContainer.releaseCasProcessor(aProcessor);
maybeLogFinest("UIMA_CPM_ok_released_cp__FINEST", aContainer);
}
}
}
/**
* In case a CAS is skipped ( due to excessive exceptions that it causes ), increments stats and
* totals
*
* @param aContainer
* @param aCasObjectList
* @param isLastCP
* @throws Exception -
*/
private void handleSkipCasProcessor(ProcessingContainer aContainer, Object[] aCasObjectList,
boolean isLastCP) throws Exception {
maybeLogFinest("UIMA_CPM_skipping_cas__FINEST", aContainer);
if (aContainer.isPaused()) {
aContainer.resume();
}
aContainer.incrementAbortCount(aCasObjectList.length);
// CAS may have already been dropped (released) due to dropCasOnException policy defined in the
// CPE Descriptor
if (!cpm.dropCasOnException()) {
try {
aContainer.logAbortedCases(aCasObjectList);
} catch (Exception e) {
throw e;
} finally {
// Do conditional release of CAS instances. The release occurs only if the CasProcessor is
// the last one
// in processing chain. This only releases instances of CAS checked out from the Cas Pool.
// If not done, the
// pool gets depleted and no more work will be done.
try {
releaseCases(casList, isLastCP, aContainer.getName());
} catch (Exception ex2) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
logSevere("UIMA_CPM_exception_releasing_cas__SEVERE", aContainer.getName());
maybeLogSevereException(ex2);
throw ex2;
}
}
}
}
}
/**
* Handle exceptions related to remote invocations.
*
* @param aContainer -
* container managing CasProcessor that failed
* @param aProcessor -
* failed CasProcessor
* @param aProcessTr -
* ProcessTrace object holding events
* @param ex -
* Source exception
*
* @throws Exception -
*/
private void handleServiceException(ProcessingContainer aContainer, CasProcessor aProcessor,
ProcessTrace aProcessTr, Exception ex) throws Exception {
if (aProcessor instanceof NetworkCasProcessorImpl) {
((NetworkCasProcessorImpl) aProcessor).collectionProcessComplete(aProcessTr);
}
// Add Cas Processor to the list of failed Cas Processors. The list will be used
// to re-initialize the Cas Processor. If the Cas Processor is fully initialized
// the Deployer will put the Cas Processor back into the pool
((ProcessingContainer_Impl) aContainer).failedCasProcessorList.add(aProcessor);
// Container is paused ONLY iff CasProcessor is Remote (unmanaged) and
// all proxies share the same CasProcessor service AND a previous thread initiated service
// restart. Only one thread can initiate a restart, other threads sharing the same service
// will block until all connections are re-established
if (aContainer.isRemote() && aContainer.isSingleFencedService()) {
if (Thread.currentThread().getName().equals(threadId)) {
maybeLogFinest("UIMA_CPM_service_connection_exception__FINEST", aContainer, aProcessor);
aProcessTr.startEvent(aContainer.getName(), "Process", "");
// Redeploy the CasProcessor
maybeLogFinest("UIMA_CPM_redeploy_cp__FINEST", aContainer, aProcessor);
// Reconnect the CPM to CasProcessor running in fenced mode
cpm.redeployAnalysisEngine(aContainer);
// Resume the container
aContainer.resume();
threadId = null;
maybeLogFinest("UIMA_CPM_redeploy_cp_done__FINEST", aContainer, aProcessor);
}
} else {
maybeLogFinest("UIMA_CPM_service_connection_exception__FINEST", aContainer, aProcessor);
aProcessTr.startEvent(aContainer.getName(), "Process", "");
maybeLogFinest("UIMA_CPM_redeploy_cp__FINEST", aContainer, aProcessor);
// Reconnect the CPM to CasProcessor running in fenced mode
cpm.redeployAnalysisEngine(aContainer);
maybeLogFinest("UIMA_CPM_redeploy_cp_done__FINEST", aContainer, aProcessor);
}
}
/**
* Diables currect CasProcessor.
*
* @param aContainer -
* a container that manages the current Cas Processor.
* @param aProcessor -
* a Cas Processor to be disabled
* @throws Exception -
* exception
*/
private void handleAbortCasProcessor(ProcessingContainer aContainer, CasProcessor aProcessor)
throws Exception {
maybeLogFinest("UIMA_CPM_disable_due_to_action__FINEST", aContainer);
if (aContainer.isPaused()) {
aContainer.resume();
}
aContainer.setStatus(Constants.CAS_PROCESSOR_DISABLED);
// Release current Cas Processor before continuing with the next Cas Processor in the pipeline
if (aProcessor != null) {
aContainer.releaseCasProcessor(aProcessor);
aProcessor = null;
}
maybeLogFinest("UIMA_CPM_disabled_cp__FINEST", aContainer);
}
/**
* Terminates the CPM
*
* @param aContainer -
* a container that manages the current Cas Processor.
* @param aProcessor -
* a Cas Processor to be disabled
* @throws Exception -
* exception
*/
private void handleAbortCPM(ProcessingContainer aContainer, CasProcessor aProcessor)
throws Exception {
if (aContainer.isPaused()) {
aContainer.resume();
}
aContainer.setStatus(Constants.CAS_PROCESSOR_KILLED);
maybeLogSevere("UIMA_CPM_abort_cpm__SEVERE", aContainer.getName());
aContainer.releaseCasProcessor(aProcessor);
// Release Cas'es. Terminating the CPM. Catch any exception that may occur
// during CAS release and allow the CPM to abort. The CAS may have already been
// dropped (returned to the CAS pool) if the dropCasOnException policy is true.
if (!cpm.dropCasOnException()) {
try {
releaseCAS = true;
releaseCases(casList, true, aContainer.getName());
} catch (Exception exc) {
maybeLogSevere("UIMA_CPM_exception_on_cpm_kill__WARNING", aContainer.getName(), exc.getMessage());
maybeLogSevereException(exc);
}
}
throw new AbortCPMException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_configured_to_abort__WARNING",
new Object[] { Thread.currentThread().getName(), aContainer.getName() }));
}
/**
* Terminates the CPM
*
* @param aContainer -
* a container that manages the current Cas Processor.
* @param aProcessor -
* a Cas Processor to be disabled
* @throws Exception -
* exception
*/
private void handleKillPipeline(ProcessingContainer aContainer) throws Exception {
if (aContainer.isPaused()) {
aContainer.resume();
}
aContainer.setStatus(Constants.CAS_PROCESSOR_KILLED);
maybeLogFinest("UIMA_CPM_kill_pipeline__FINEST", aContainer);
releaseCAS = true;
if (casCache != null) {
// First release any CASes from the Cas Cache
cpm.releaseCASes(casCache);
casCache = null;
}
releaseCases(casList, true, aContainer.getName());
}
/**
* Determines if the thread should be paused. Pausing container effectively pauses ALL Cas
* Processors that are managed by the container. The pause is needed when there are multiple
* pipelines shareing a common service. If this service dies (Socket Down), only one thread should
* initiate service restart. While the service is being restarted no invocations on the service
* should be done. Containers will be resumed on successfull service restart.
*
* @param aContainer -
* a container that manages the current Cas Processor.
* @param aProcessor -
* a Cas Processor to be disabled
* @param aThreadId -
* id of the current thread
*
* @throws Exception -
* exception
*/
private boolean pauseContainer(ProcessingContainer aContainer, Exception aException,
String aThreadId) {
if (aContainer.isRemote() && aContainer.isSingleFencedService()
&& aException.getCause() instanceof ServiceConnectionException && aThreadId == null) {
return true;
}
return false;
}
/**
* Conditionally, releases CASes back to the CAS pool. The release only occurs if the Cas
* Processor is the last in the processing chain.
*
* @param aCasList -
* list of CASes to release
* @param lastProcessor -
* determines if the release takes place
* @param aContainer -
* current container
*/
private void releaseCases(Object aCasList, boolean lastProcessor, String aName) // ProcessingContainer
// aContainer)
{
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_releasing_cases__FINEST",
aName, String.valueOf(releaseCAS), String.valueOf(lastProcessor));
}
if (aCasList == null) {
return;
}
if (releaseCAS && lastProcessor) {
if (aCasList instanceof CAS[]) {
if (casCache != null) {
// First release any CASes from the Cas Cache
cpm.releaseCASes(casCache);
casCache = null;
}
cpm.releaseCASes((CAS[]) aCasList);
maybeLogFinest("UIMA_CPM_done_releasing_cases__FINEST", aName);
} else {
maybeLogFinest("UIMA_CPM_casobject_class__FINEST", aName, aCasList.getClass().getName());
}
}
}
/**
* Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es
*
* @param aCas -
* object containing an array of OR a single instance of Cas
* @param isCasObject -
* true if instance of Cas is of type Cas, false otherwise
* @param aEntityProcStatus -
* status object that may contain exceptions and trace
*/
protected void notifyListeners(Object aCas, boolean isCasObject,
EntityProcessStatus aEntityProcStatus) {
if (aCas instanceof Object[]) {
for (int i = 0; i < ((Object[]) aCas).length; i++) {
doNotifyListeners(((Object[]) aCas)[i], isCasObject, aEntityProcStatus);
}
} else {
doNotifyListeners(aCas, isCasObject, aEntityProcStatus);
}
}
/**
* Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the
* listener. Convertions take place to ensure compatibility.
*
* @param aCas -
* Cas to pass to listener
* @param isCasObject -
* true is Cas is of type CAS
* @param aEntityProcStatus -
* status object containing exceptions and trace info
*/
protected void doNotifyListeners(Object aCas, boolean isCasObject,
EntityProcessStatus aEntityProcStatus) {
// Notify Listener that the entity has been processed
Object casObjectCopy = aCas;
// Notify ALL listeners
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
// Based on type of listener do appropriate conversions of Cas if necessary
if (statCL instanceof CasDataStatusCallbackListener) {
// The Cas is of type CAS, need to convert it to CasData
if (isCasObject == true) {
// Convert CAS to CasData object
casObjectCopy = mConverter.casContainerToCasData((CAS) casObjectCopy);
}
// Notify the listener that the Cas has been processed
((CasDataStatusCallbackListener) statCL).entityProcessComplete((CasData) casObjectCopy,
aEntityProcStatus);
} else if (statCL instanceof StatusCallbackListener) {
boolean casFromPool = false;
// The cas is of type CasData, need to convert it to CAS
if (isCasObject == false) {
conversionCas = null;
if (casCache != null && casCache[0] != null) {
conversionCas = casCache[0];
} else {
while (conversionCas == null) {
conversionCas = casPool.getCas(0);
}
casFromPool = true;
}
try {
mConverter.casDataToCasContainer((CasData) casObjectCopy, conversionCas, true);
} catch (CollectionException e) {
logWarning("UIMA_CPM_exception_converting_CAS__WARNING");
}
casObjectCopy = conversionCas;
}
// Notify the listener that the Cas has been processed
// ((StatusCallbackListener) statCL).entityProcessComplete((CAS) casObjectCopy,
// aEntityProcStatus);
CPMEngine.callEntityProcessCompleteWithCAS((StatusCallbackListener) statCL, (CAS) casObjectCopy, aEntityProcStatus);
if (conversionCas != null) {
if (casFromPool) {
conversionCasArray[0] = conversionCas;
cpm.releaseCASes(conversionCasArray);
}
conversionCas = null;
if (casCache != null && casCache[0] != null) {
casCache[0].reset();
}
}
}
}
}
/**
* Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS
* at the end of processing. This is typically done for Cas Consumer thread, but in configurations
* not using Cas Consumers The processing pipeline may also release the CAS.
*
* @param aFlag -
* true if this thread should release a CAS when analysis is complete
*/
public void setReleaseCASFlag(boolean aFlag) {
releaseCAS = aFlag;
}
/**
* Stops all Cas Processors that are part of this PU.
*
* @param kill -
* true if CPE has been stopped before finishing processing during external stop
*
*/
public void stopCasProcessors(boolean kill) {
maybeLogFinest("UIMA_CPM_stop_containers__FINEST");
// Stop all running CASProcessors
for (int i = 0; processContainers != null && i < processContainers.size(); i++) {
ProcessingContainer container = (ProcessingContainer) processContainers.get(i);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_show_container_time__FINEST",
container.getName(), String.valueOf(container.getTotalTime()));
}
synchronized (container) {
// Change the status of this container to KILLED if the CPM has been stopped
// before completing the collection and current status of CasProcessor is
// either READY or RUNNING
if (kill || (!cpm.isRunning() && isProcessorReady(container.getStatus()))) {
maybeLogFinest("UIMA_CPM_kill_cp__FINEST", container);
container.setStatus(Constants.CAS_PROCESSOR_KILLED);
} else {
// If the CasProcessor has not been disabled during processing change its
// status to COMPLETED.
if (container.getStatus() != Constants.CAS_PROCESSOR_DISABLED) {
container.setStatus(Constants.CAS_PROCESSOR_COMPLETED);
}
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_container_status__FINEST",
container.getName(), String.valueOf(container.getStatus()));
}
ProcessTrace pTrTemp = new ProcessTrace_impl(cpm.getPerformanceTuningSettings());
pTrTemp.startEvent(container.getName(), "End of Batch", "");
try {
CasProcessorDeployer deployer = container.getDeployer();
if (deployer != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_undeploy_cp_instances__FINEST", container.getName(), deployer.getClass().getName());
}
deployer.undeploy();
}
container.destroy();
} catch (Exception e) {
logWarning("UIMA_CPM_exception_during_cp_stop__WARNING", container.getName(), e.getMessage());
} finally {
pTrTemp.endEvent(container.getName(), "End of Batch", "");
if (processingUnitProcessTrace != null) {
this.processingUnitProcessTrace.aggregate(pTrTemp);
}
}
}
}
}
/**
* Returns true if the CPM has finished analyzing the collection.
*
* @param aCount -
* running total of documents processed so far
*
* @return - true if CPM has processed all docs, false otherwise
*/
protected boolean endOfProcessingReached(long aCount) {
if (numToProcess == -1) {
return false;
} else if (numToProcess == 0) {
return true;
} else {
return (aCount >= numToProcess);
}
}
/**
*
* @param anArtifact -
*/
protected void process(Object anArtifact) {
if (anArtifact instanceof Object[]) {
Object[] cases = (Object[]) anArtifact;
showMetadata(cases);
} else {
}
}
/**
*
* @param aCasList -
*/
protected void showMetadata(Object[] aCasList) {
}
/**
* @return true if the CASProcessor status is available for processing
*/
protected boolean isProcessorReady(int aStatus) {
if (aStatus == Constants.CAS_PROCESSOR_READY || aStatus == Constants.CAS_PROCESSOR_RUNNING) {
return true;
}
return false;
}
/**
* Returns the size of the CAS object. Currently only CASData is supported.
*
* @param aCas -
* Cas to get the size for
*
* @return the size of the CAS object. Currently only CASData is supported.
*/
protected long getBytes(Object aCas) {
try {
if (aCas instanceof CasData) {
return DATACasUtils.getByteCount((CasData) aCas);
}
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
/**
* @param aPool -
*/
public void setCasPool(CPECasPool aPool) {
casPool = aPool;
}
private boolean filterOutTheCAS(ProcessingContainer aContainer, boolean isCasObject,
Object[] aCasObjectList) {
// Check if any of the Cas'es in the set has a required feature structure.
// This is currently only supported for the CasData instances and provides
// filtering mechanism
if (!isCasObject && !aContainer.processCas(aCasObjectList)) {
maybeLogFinest("UIMA_CPM_skip_CAS__FINEST", aContainer);
aContainer.incrementFilteredCount(aCasObjectList.length);
aContainer.logAbortedCases(aCasObjectList);
return true; // skipped this CAS
}
return false;
}
private boolean containerDisabled(ProcessingContainer aContainer) {
synchronized (aContainer) {
// Check to see if the CasProcessor is available for processing
if (!isProcessorReady(aContainer.getStatus())) {
maybeLogFinest("UIMA_CPM_container_not_ready__FINEST", aContainer);
// Skip any CasProcessor that is not ready to process. Cas Processors may be disabled during
// processing
return true;
}
}
return false;
}
/* **************************************************************** */
/**
* An alternate processing loop designed for the single-threaded CPM.
*
* @param aCasObjectList -
* a list of CASes to analyze
* @param pTrTemp -
* process trace where statistics are added during analysis
* @throws Exception -
* @return -
*/
protected boolean analyze(Object[] aCasObjectList, ProcessTrace pTrTemp) throws Exception // throws
// ResourceProcessException,
// IOException,
// CollectionException,
// AbortCPMException
{
long t1 = 0;
maybeLogFinest("UIMA_CPM_start_analysis__FINEST");
// String lastDocId = "";
CasProcessor processor = null;
// This is used to hold an index of the current CasObject
boolean doneAlready = false;
// If there are no CASes in the list, return false since there is nothing else to do
if (aCasObjectList == null || aCasObjectList[0] == null) {
maybeLogSevere("UIMA_CPM_invalid_cas_reference__SEVERE");
return false;
}
Object[] casObjects = null;
// Determine if the Cas'es contained in the CasList are of type CAS. Samples the first CAS in
// the list.
// The list contains CASes of the same type ( either CasData or CAS ). Mixed model not
// supported.
boolean isCasObject = aCasObjectList[0] instanceof CAS;
// String docid = "";
maybeLogFinest("UIMA_CPM_entering_pipeline__FINEST");
ProcessingContainer container = null;
String containerName = "";
// *******************************************
// ** P R O C E S S I N G P I P E L I N E **
// *******************************************
// Send Cas Object through the processing pipeline.
for (int i = 0; processContainers != null && i < processContainers.size(); i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest("UIMA_CPM_retrieve_container__FINEST", String.valueOf(i));
}
// Retrieve the container. Container manages one or more instances of CAS Processor
container = (ProcessingContainer) processContainers.get(i);
// container can be disabled in multi-processing pipeline configurations. The container is
// disabled
// when one of the processing threads is in the process of restarting/reconnecting to a shared
// fenced service. Shared, meaning that all processing pipelines use the same service for
// invocations.
// Container must be disabled to prevent concurrent restarts.
if (containerDisabled(container) || filterOutTheCAS(container, isCasObject, aCasObjectList)) {
continue;
}
containerName = container.getName();
// Flag controlling do-while loop that facilitates retries. Retries are defined in the
// CasProcessor configuration.
boolean retry = false;
do // Retry
{
try {
if (System.getProperty("SHOW_MEMORY") != null) {
maybeLogMemoryFinest();
}
maybeLogFinest("UIMA_CPM_checkout_cp_from_container__FINEST", container);
threadState = 2004;
t1 = System.currentTimeMillis();
// Get the CasProcessor from the pool managed by the container
processor = container.getCasProcessor();
timer01 += (System.currentTimeMillis() - t1);
timer06 = ((ProcessingContainer_Impl) container).getFetchTime();
if (processor == null) {
maybeLogSevere("UIMA_CPM_checkout_null_cp_from_container__SEVERE", containerName);
throw new ResourceProcessException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_EXP_invalid_component_reference__WARNING", new Object[] {
Thread.currentThread().getName(), "CasProcessor", "NULL" }), null);
}
// Check to see if the CasProcessor is available for processing
// The CasProcessor may have been disabled due to excessive errors and error policy
// defined
// in the CPE descriptor.
if (!isProcessorReady(container.getStatus())) {
maybeLogFinest("UIMA_CPM_container_not_ready__FINEST", container);
if (container.getStatus() == Constants.CAS_PROCESSOR_KILLED) {
container.releaseCasProcessor(processor);
// Another thread has initiated CPM Abort. That Thread has already notified
// the application of the Abort. Here we just return as the CPM has been
// killed most likely due to excessive errors.
return false;
}
// Skip any CasProcessor that is not ready to process
break;
}
maybeLogFinest("UIMA_CPM_checkedout_cp_from_container__FINEST", container, processor);
t1 = System.currentTimeMillis();
// ************************* P E R F O R M A N A L Y S I S *************************
if (processor instanceof CasDataProcessor) {
invokeCasDataCasProcessor(container, processor, aCasObjectList, pTrTemp, isCasObject,
retry);
isCasObject = false;
} else if (processor instanceof CasObjectProcessor) {
invokeCasObjectCasProcessor(container, processor, aCasObjectList, pTrTemp, isCasObject);
isCasObject = true;
}
timer02 += (System.currentTimeMillis() - t1);
maybeLogFinest("UIMA_CPM_analysis_successfull__FINEST", container, processor);
retry = false;
// On successfull processing reset the restart counter. Restart counter determines how
// many times to restart Cas Processor on the same CAS
// Do this conditionally. If the CAS is to be dropped on Exception this restart counter
// scope extends to the entire collection not just one CAS
if (!cpm.dropCasOnException()) {
container.resetRestartCount();
}
} catch (Exception e) {
retry = handleErrors(e, container, processor, pTrTemp, aCasObjectList, isCasObject);
if (cpm.dropCasOnException()) {
retry = false; // override
return false; // Dont pass the CAS to the CasConsumer. CAS has been dropped
}
} finally {
if (retry == false) {
maybeLogFinest("UIMA_CPM_end_of_batch__FINEST", container, processor);
if (isProcessorReady(container.getStatus())) {
t1 = System.currentTimeMillis();
// Let the container take action if the end-of-batch marker has been reached.
// End-of-batch marker is defined in the cpm configuration for every CasProcessor.
// This marker is defined in the <checkpoint> section of the CasProcessor Definition
// and corresponds to the attribute "batch". If end-of-batch marker is reached the
// container
// invokes batchProcessComplete() on the CasProcessor
doEndOfBatch(container, processor, pTrTemp, aCasObjectList.length);
timer03 += (System.currentTimeMillis() - t1);
}
} else {
container.incrementRetryCount(1);
}
// Release current Cas Processor before continuing with the next Cas Processor in the
// pipeline
if (processor != null) {
maybeLogFinest("UIMA_CPM_release_cp__FINEST", container, processor, casCache);
t1 = System.currentTimeMillis();
doReleaseCasProcessor(container, processor);
timer04 += (System.currentTimeMillis() - t1);
maybeLogFinest("UIMA_CPM_ok_release_cp__FINEST", container, processor, casCache);
processor = null;
}
}
} while (retry);
} // end of: For All CasProcessors
t1 = System.currentTimeMillis();
postAnalysis(aCasObjectList, isCasObject, casObjects, pTrTemp, doneAlready);
timer05 += (System.currentTimeMillis() - t1);
casObjects = null;
return true;
}
private void doReleaseCasProcessor(ProcessingContainer aContainer, CasProcessor aCasProcessor) {
if (aCasProcessor != null && aContainer != null) {
aContainer.releaseCasProcessor(aCasProcessor);
}
}
private void doEndOfBatch(ProcessingContainer aContainer, CasProcessor aProcessor,
ProcessTrace aProcessTr, int howManyCases) {
String containerName = aContainer.getName();
try {
aContainer.isEndOfBatch(aProcessor, howManyCases);
maybeLogFinest("UIMA_CPM_end_of_batch_completed__FINEST", aContainer);
} catch (Exception ex) {
maybeLogSevere("UIMA_CPM_end_of_batch_exception__SEVERE", containerName, ex.getMessage());
}
}
/**
* Main routine that handles errors occuring in the processing loop.
*
* @param e -
* exception in the main processing loop
* @param aContainer -
* current container of the Cas Processor
* @param aProcessor -
* current Cas Processor
* @param aProcessTrace -
* an object containing stats for this procesing loop
* @param aCasObjectList -
* list of CASes being analyzed
* @param isCasObject -
* determines type of CAS in the aCasObjectList ( CasData or CasObject)
* @return boolean
* @throws Exception -
*/
private boolean handleErrors(Throwable e, ProcessingContainer aContainer,
CasProcessor aProcessor, ProcessTrace aProcessTrace, Object[] aCasObjectList,
boolean isCasObject) throws Exception {
boolean retry = true;
String containerName = aContainer.getName();
e.printStackTrace();
maybeLogSevereException(e);
maybeLogSevere("UIMA_CPM_handle_exception__SEVERE",
containerName, aProcessor.getClass().getName(), e.getMessage());
EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(aProcessTrace);
enProcSt.addEventStatus("Process", "Failed", e);
threadState = 2008;
// Send exception notifications to all registered listeners
notifyListeners(aCasObjectList, isCasObject, enProcSt);
threadState = 2009;
// Check the policy to determine what to do with the CAS on exception. Return the CAS back to
// the pool
// and stop the processing chain if required. The policy for what to do with the CAS on
// exception is
// defined in the CPE descriptor
if (cpm.dropCasOnException()) {
if (casCache != null) {
clearCasCache();
}
logWarning("UIMA_CPM_drop_cas__WARNING", containerName, aProcessor.getClass().getName());
// Release CASes and notify listeners
cpm.invalidateCASes((CAS[]) aCasObjectList);
retry = false; // Dont retry. The CAS has been released
}
// If the container is in pause state dont increment errors since one thread has already
// done this. While the container is in pause state the CPM is attempting to re-connect
// to a failed service. Once that is done, the container is going to be resumed. While
// in pause state ALL threads using the container will be suspended.
if (aProcessor instanceof CasObjectNetworkCasProcessorImpl && aContainer.isPaused()) {
threadState = 2010;
maybeLogFinest("UIMA_CPM_container_paused_do_retry__FINEST", aContainer);
return true; // retry
}
if (e instanceof Exception && pauseContainer(aContainer, (Exception) e, threadId)) {
maybeLogFinest("UIMA_CPM_pausing_container__FINEST", aContainer);
threadState = 2011;
// New Code 02/23/05
// Pause the container while the CPM is re-connecting to un-managed service
// that is shared by all processing threads
aContainer.pause();
threadId = Thread.currentThread().getName();
}
try {
// Increments error counter and determines if any threshold have been reached. If
// the max error rate is reached, the CasProcessor can be configured as follows:
// - terminates CPM when threshold is reached ( method below throws AbortCPMException)
// - disables CasProcessor ( method below throws AbortCasProcessorException )
// - continue, CasProcessor continues to run dispite error
aContainer.incrementCasProcessorErrors(e);
// End of new code
} // check if the exception should terminate the CPM
catch (AbortCPMException ex) {
retry = false;
if (aContainer.isPaused()) {
aContainer.resume();
}
aContainer.setStatus(Constants.CAS_PROCESSOR_KILLED);
maybeLogFinest("UIMA_CPM_EXP_configured_to_abort__WARNING", aProcessor);
throw new AbortCPMException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_configured_to_abort__WARNING",
new Object[] { Thread.currentThread().getName(), containerName }));
} // check if the CasProcessor is to be disabled due to excessive errors
catch (AbortCasProcessorException ex) {
retry = false;
maybeLogFinest("UIMA_CPM_disable_cp__SEVERE", aProcessor);
if (aContainer.isPaused()) {
aContainer.resume();
}
aContainer.setStatus(Constants.CAS_PROCESSOR_DISABLED);
} // check if need to redeploy the CasProcessor
catch (ServiceConnectionException ex) {
aProcessTrace.startEvent(containerName, "Process", "");
String status = "failure";
try {
threadState = 2012;
handleServiceException(aContainer, aProcessor, aProcessTrace, ex);
// Increment number of restarts
// aContainer.incrementRestartCount(1);
status = "success";
} catch (ResourceProcessException rpe) {
throw rpe;
} catch (Exception rpe) {
throw new ResourceProcessException(rpe);
} finally {
aProcessTrace.endEvent(containerName, "Process", status);
}
} catch (SkipCasException ex) {
try {
handleSkipCasProcessor(aContainer, aCasObjectList, false);
retry = false;
} catch (Exception sEx) {
throw new ResourceProcessException(sEx);
}
} catch (Exception ex) {
maybeLogSevereException(ex);
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
// done as 2 messages because there is no method supporting
// both a Throwable, and a message with substitutable args, in the logger
logSevere("UIMA_CPM_unhandled_error__SEVERE", e.getLocalizedMessage());
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE,
this.getClass().getName(),
"handleErrors",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_unexpected_exception__SEVERE",
ex);
}
retry = false;
ex.printStackTrace();
}
return retry;
}
/**
*
* @param container
* @param processor
* @param aCasObjectList
* @param pTrTemp
* @param isCasObject
* @throws Exception -
*/
private void invokeCasObjectCasProcessor(ProcessingContainer container, CasProcessor processor,
Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject) throws Exception {
maybeLogMemoryFinest();
maybeLogFinest("UIMA_CPM_invoke_cp_process__FINEST", container, processor);
casList = new CAS[aCasObjectList.length];
for (int casIndex = 0; casIndex < aCasObjectList.length; casIndex++) {
maybeLogFinest("UIMA_CPM_initialize_cas__FINEST", container);
if (aCasObjectList[casIndex] == null) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.SEVERE,
this.getClass().getName(),
"process",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_casobjectlist_is_null__SEVERE",
new Object[] { Thread.currentThread().getName(), container.getName(),
String.valueOf(casIndex) });
}
break;
}
if (isCasObject == false) {
convertCasDataToCasObject(casIndex, container.getName(), aCasObjectList);
} else {
casList[casIndex] = (CAS) aCasObjectList[casIndex];
}
if (processor instanceof AnalysisEngine) {
maybeLogFinest("UIMA_CPM_call_process__FINEST", container, processor);
threadState = 2005;
pTrTemp.aggregate(((AnalysisEngine) processor).process(casList[casIndex]));
maybeLogFinest("UIMA_CPM_call_process_completed__FINEST", container, processor);
} else {
pTrTemp.startEvent(container.getName(), "Process", "");
threadState = 2006;
maybeLogFinest("UIMA_CPM_call_process__FINEST", container, processor);
((CasObjectProcessor) processor).processCas(casList[casIndex]);
maybeLogFinest("UIMA_CPM_call_process_completed__FINEST", container, processor);
}
pTrTemp.endEvent(container.getName(), "Process", "success");
}
aCasObjectList = casList;
}
/**
*
* @param casIndex
* @param aContainerName
* @param aCasObjectList
* @throws Exception -
*/
private void convertCasDataToCasObject(int casIndex, String aContainerName,
Object[] aCasObjectList) throws Exception {
// The following may be true if the CollectionReader is CasData based and this is the first
// CasObject based annotator in the chain.
if (casCache == null || casCache[casIndex] == null) {
// casList[casIndex] = casPool.getCas();
casList[casIndex] = null;
while (casList[casIndex] == null) {
maybeLogFinest("UIMA_CPM_get_cas_from_pool__FINEST", aContainerName);
// Retrieve a Cas from Cas Pool. Wait max 10 millis for an instance
casList[casIndex] = casPool.getCas(0);
maybeLogFinest("UIMA_CPM_got_cas_from_pool__FINEST", aContainerName);
}
if (casList[casIndex] != null) {
maybeLogFinest("UIMA_CPM_call_cas_reset__FINEST", aContainerName);
casList[casIndex].reset();
}
} else {
casList[casIndex] = casCache[casIndex];
casList[casIndex].reset();
// Cas is used up
casCache[casIndex] = null;
}
// Convert CasData to CAS
mConverter.casDataToCasContainer((CasData) aCasObjectList[casIndex], casList[casIndex], true);
}
/**
*
* @param container
* @param processor
* @param aCasObjectList
* @param pTrTemp
* @param isCasObject
* @param retry
* @throws Exception -
*/
private void invokeCasDataCasProcessor(ProcessingContainer container, CasProcessor processor,
Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject, boolean retry)
throws Exception {
maybeLogFinest("UIMA_CPM_cas_data_processor__FINEST", container, processor);
pTrTemp.startEvent(container.getName(), "Process", "");
// Check if the CasObject to CasData conversion is necessary
if (isCasObject == true) {
CasData[] casDataObjects = new CasData[aCasObjectList.length];
for (int casIndex = 0; casIndex < aCasObjectList.length; casIndex++) {
casDataObjects[casIndex] = mConverter.casContainerToCasData((CAS) aCasObjectList[casIndex]);
// After the conversion reset the CAS so that it can be reused
if ((CAS) aCasObjectList[casIndex] != null) {
((CAS) aCasObjectList[casIndex]).reset();
}
}
// Cache the CAS list for possible reuse
casCache = (CAS[]) aCasObjectList;
// aCasObjectList is working list.
aCasObjectList = casDataObjects;
}
long byteCount = 0;
if (!retry) {
for (int casIndex = 0; casIndex < aCasObjectList.length; casIndex++) {
byteCount = getBytes(aCasObjectList[casIndex]);
container.addBytesIn(byteCount);
}
}
Object[] casObjects = aCasObjectList;
long pStart = System.currentTimeMillis();
if (!(casObjects instanceof CasData[])) {
maybeLogFinest("UIMA_CPM_expected_casdata__FINEST", casObjects.getClass().getName());
}
maybeLogFinest("UIMA_CPM_call_process__FINEST", container, processor);
casObjects = ((CasDataProcessor) processor).process((CasData[]) casObjects);
maybeLogFinest("UIMA_CPM_call_process_completed__FINEST", container, processor);
long pEnd = System.currentTimeMillis();
container.incrementTotalTime((pEnd - pStart));
if (casObjects != null) {
if (processor instanceof CasDataConsumer) {
container.addBytesOut(byteCount);
} else {
aCasObjectList = casObjects;
if (!retry) {
for (int casIndex = 0; casIndex < aCasObjectList.length; casIndex++) {
byteCount = getBytes(aCasObjectList[casIndex]);
container.addBytesOut(byteCount);
}
}
}
}
pTrTemp.endEvent(container.getName(), "Process", "success");
}
/**
* loggers
* Special forms for frequent args sets
* "maybe" versions test isLoggable
*
* Additional args passed as object array to logger
*
*/
private static final Object [] zeroLengthObjectArray = new Object[0];
private static final String thisClassName = ProcessingUnit.class.getName();
private void logCPM(Level level, String msgBundleId, Object[] args) {
if (null == args) {
args = zeroLengthObjectArray;
}
Object[] aa = new Object[args.length + 1];
aa[0] = Thread.currentThread().getName();
System.arraycopy(args, 0, aa, 1, args.length);
UIMAFramework.getLogger(this.getClass()).logrb(
level,
thisClassName,
"process", // used as the method name
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
msgBundleId,
aa
);
}
// 0 arg
private void maybeLogFinest(String msgBundleId) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest(msgBundleId);
}
}
private void logFinest(String msgBundleId) {
logCPM(Level.FINEST, msgBundleId, null);
}
// 1 arg
private void maybeLogFinest(String msgBundleId, String arg1) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest(msgBundleId, arg1);
}
}
private void logFinest(String msgBundleId, String arg1) {
logCPM(Level.FINEST, msgBundleId, new Object [] {arg1});
}
// 2 args
private void maybeLogFinest(String msgBundleId, String arg1, String arg2) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest(msgBundleId, arg1, arg2);
}
}
private void logFinest(String msgBundleId, String arg1, String arg2) {
logCPM(Level.FINEST, msgBundleId, new Object [] {arg1, arg2});
}
// 3 args
private void logFinest(String msgBundleId, String arg1, String arg2, String arg3) {
logCPM(Level.FINEST, msgBundleId, new Object [] {arg1, arg2, arg3});
}
// special common 2 arg version with container, processor
private void maybeLogFinest(String msgBundleId, ProcessingContainer container, CasProcessor processor) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest(msgBundleId, container.getName(), processor.getClass().getName());
}
}
private void logFinest(String msgBundleId, ProcessingContainer container, CasProcessor processor) {
logFinest(msgBundleId, container.getName(), processor.getClass().getName());
}
private void maybeLogFinest(String msgBundleId, ProcessingContainer container) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest(msgBundleId, container.getName());
}
}
private void maybeLogFinest(String msgBundleId, CasProcessor processor) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest(msgBundleId, processor.getClass().getName());
}
}
private void maybeLogFinest(String msgBundleId, ProcessingContainer container, CasProcessor processor, CAS [] casCache) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest(msgBundleId, container.getName(), processor.getClass().getName(),
String.valueOf(casCache == null));
}
}
private void maybeLogFinest(String msgBundleId, CAS [] casCache) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logFinest(msgBundleId, String.valueOf(casCache == null));
}
}
private void maybeLogMemoryFinest() {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
logMemoryFinest();
}
}
private void logMemoryFinest() {
logFinest("UIMA_CPM_show_memory__FINEST",
String.valueOf(Runtime.getRuntime().totalMemory() / 1024),
String.valueOf(Runtime.getRuntime().freeMemory() / 1024));
}
private void logWarning(String msgBundleId) {
logCPM(Level.WARNING, msgBundleId, null);
}
private void maybeLogWarning(String msgBundleId, String arg1, String arg2) {
if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
logWarning(msgBundleId, arg1, arg2);
}
}
private void logWarning(String msgBundleId, String arg1, String arg2) {
logCPM(Level.WARNING, msgBundleId, new Object [] {arg1, arg2});
}
private void maybeLogSevere(String msgBundleId) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
logCPM(Level.SEVERE, msgBundleId, null);
}
}
private void maybeLogSevere(String msgBundleId, String arg1) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
logSevere(msgBundleId, arg1);
}
}
private void logSevere(String msgBundleId, String arg1) {
logCPM(Level.SEVERE, msgBundleId, new Object[] {arg1});
}
private void maybeLogSevere(String msgBundleId, String arg1, String arg2) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
logSevere(msgBundleId, arg1, arg2);
}
}
private void logSevere(String msgBundleId, String arg1, String arg2) {
logCPM(Level.SEVERE, msgBundleId, new Object[] {arg1, arg2});
}
private void maybeLogSevere(String msgBundleId, String arg1, String arg2, String arg3) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
logSevere(msgBundleId, arg1, arg2, arg3);
}
}
private void logSevere(String msgBundleId, String arg1, String arg2, String arg3) {
logCPM(Level.SEVERE, msgBundleId, new Object[] {arg1, arg2, arg3});
}
private void maybeLogSevereException(Throwable e) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
String m = "Thread: " + Thread.currentThread().getName() + ", message: " + e.getLocalizedMessage();
UIMAFramework.getLogger().log(Level.SEVERE, m, e);
}
}
private void maybeLogFinestWorkQueue(String msgBundleId, BoundedWorkQueue workQueue) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
logFinest(msgBundleId, workQueue.getName(), String.valueOf(workQueue.getCurrentSize()));
}
}
}