blob: d41f33f860ce11edf2c3426c6b1599b64572adbc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.collection.impl.cpm.engine;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Map;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UimaContextAdmin;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.cas_data.CasData;
import org.apache.uima.collection.CasInitializer;
import org.apache.uima.collection.CollectionException;
import org.apache.uima.collection.CollectionReader;
import org.apache.uima.collection.StatusCallbackListener;
import org.apache.uima.collection.base_cpm.BaseCollectionReader;
import org.apache.uima.collection.base_cpm.CasDataCollectionReader;
import org.apache.uima.collection.base_cpm.SkipCasException;
import org.apache.uima.collection.impl.EntityProcessStatusImpl;
import org.apache.uima.collection.impl.cpm.Constants;
import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
import org.apache.uima.collection.impl.cpm.utils.ChunkMetadata;
import org.apache.uima.collection.impl.cpm.vinci.DATACasUtils;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.metadata.Capability;
import org.apache.uima.util.Level;
import org.apache.uima.util.ProcessTrace;
import org.apache.uima.util.Progress;
import org.apache.uima.util.UimaTimer;
import org.apache.uima.util.impl.ProcessTrace_impl;
/**
* Component responsible for continuously filling a work queue with bundles containing Cas'es. The
* queue is shared with a Processing Pipeline that consumes bundles of Cas. As soon as the the
* bundle is removed from the queue, this component fetches data from configured Collection Reader
* and enques it onto the queue. This component facilitates asynchronous reading and processing of
* CAS by seperate threads running in the CPE.
*
* When end of processing is reached due to CPM shutdown or max number of entities are processed a
* special token, called EOFToken is placed onto a queue. It marks end of processing for Processing
* Units. No more data is expected to be placed on the work queue. The Processing Threads upon
* seeing the EOFToken are expected to complete processing and do necessary cleanup.
*
*
*/
public class ArtifactProducer extends Thread {
public int threadState = 0;
private CPECasPool casPool;
// Queue shared wit ProcessingUnits
private BoundedWorkQueue workQueue = null;
// private BoundedWorkQueue outputQueue = null;
private BaseCollectionReader collectionReader = null;
// Number of CAS'es for each fetch from the CollectionReader
private int readerFetchSize = 1;
private CAS[] casList;
private long entityCount = 0;
private long maxToProcess;
private CPMEngine cpm = null;
private Map cpmStatTable = null;
private String[] lastDocId = { "" };
private long totalFetchTime = 0;
private UimaTimer timer = null;
private ArrayList callbackListeners = null;
private Hashtable timedoutDocs = new Hashtable();
private boolean isRunning = false;
private ProcessTrace globalSharedProcessTrace = null;
/**
* Instantiates and initializes this instance.
*
* @param acpm
*/
public ArtifactProducer(CPMEngine acpm) {
cpm = acpm;
if (cpm != null) {
callbackListeners = cpm.getCallbackListeners();
}
}
/**
* Construct instance of this class with a reference to the cpe engine and a pool of cas'es.
*
* @param acpm -
* reference to the cpe
* @param aPool -
* pool of cases
*/
public ArtifactProducer(CPMEngine acpm, CPECasPool aPool) {
cpm = acpm;
casPool = aPool;
if (cpm != null) {
callbackListeners = cpm.getCallbackListeners();
}
}
public boolean isRunning() {
return isRunning;
}
/**
* Plug in Custom Timer to time events
*
* @param aTimer -
* custom timer
*/
public void setUimaTimer(UimaTimer aTimer) {
timer = aTimer;
}
public void setProcessTrace(ProcessTrace aProcTrace) {
globalSharedProcessTrace = aProcTrace;
}
/**
* Returns total time spent when fetching entities from a CollectionReader. This provides a way of
* gauging throughput of a particular CR.
*
* @return total time spent when fetching entities. -1 when the fetch time is unknown.
*/
public long getCollectionReaderTotalFetchTime() {
if (timer != null && totalFetchTime > 0) {
return totalFetchTime;
}
return -1;
}
/**
* Null out fields of this object. Call this only when this object is no longer needed.
*/
public void cleanup() {
this.casPool = null;
this.workQueue = null;
this.collectionReader = null;
this.casList = null;
this.cpm = null;
if (this.cpmStatTable != null) {
this.cpmStatTable.clear();
this.cpmStatTable = null;
}
this.lastDocId = null;
}
/**
* Assign total number of entities to process
*
* @param aNumToProcess -
* number of entities to read from the Collection Reader
*/
public void setNumEntitiesToProcess(long aNumToProcess) {
maxToProcess = aNumToProcess;
}
/**
* Assign CollectionReader to be used for reading
*
* @param aCollectionReader -
* collection reader as source of data
*/
public void setCollectionReader(BaseCollectionReader aCollectionReader) {
collectionReader = aCollectionReader;
if (collectionReader.getProcessingResourceMetaData().getConfigurationParameterSettings()
.getParameterValue("fetchSize") != null) {
// Determines how many at a time this Collection Reader will return
// for each fetch
readerFetchSize = ((Integer) collectionReader.getProcessingResourceMetaData()
.getConfigurationParameterSettings().getParameterValue("fetchSize")).intValue();
}
}
/**
* Assigns a queue where the artifacts produced by this component will be deposited
*
* @param aQueue -
* queue for the artifacts this class is producing
*/
public void setWorkQueue(BoundedWorkQueue aQueue) {
workQueue = aQueue;
}
/**
* Add table that will contain statistics gathered while reading entities from a Collection This
* table is used for non-uima reports.
*
* @param aStatTable
*/
public void setCPMStatTable(Map aStatTable) {
cpmStatTable = aStatTable;
}
/**
* Determines if the CPM has processed configured number of entities. Called after each fetch from
* the Collection Reader.
*
* @return true - all configurted entities processed, false otherwise
*/
private boolean endOfProcessingReached() {
// Special case, -1 means all entities in the corpus
if (maxToProcess == -1) {
return false;
} else if (maxToProcess == 0) {
return true;
} else {
// check if exceeded or matched the configured max number of
// entities
return (entityCount >= maxToProcess);
}
}
/**
* Fills the queue up to capacity. This is called before activating ProcessingPipeline as means of
* optimizing processing. When pipelines start up there are already entities in the work queue to
* process.
*/
public void fillQueue() throws Exception {
// Create an array holding CAS'es. Configuration of the Reader may
// include
// a number of CAS'es to fetch at a time. In this case the array will
// have
// size greater than the default (1)
Object[] casObjectList = new Object[1];
long capacity = workQueue.getCapacity();
if (capacity > maxToProcess) {
capacity = maxToProcess;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_reset_queue_size__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(capacity) });
}
}
try {
// Fill the work queue with entities from the CollectionReader
// capacity=number of slots in the work queue
for (int i = 0; i < capacity; i++) {
if (collectionReader.hasNext()) {
// The CollectionReader returns 1 or more entities at a
// time. The CollectionReader
// configuration determines how many it will return for each
// getNext() call. In case
// WF Large Store, the 'readerFetchSize' parameter is not
// used, since the store itself
// determines how many entities to return for each fetch.
casObjectList = readNext(readerFetchSize);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"process",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_enqueue_cas_bundle__FINEST",
new Object[] { Thread.currentThread().getName(),
String.valueOf(casObjectList.length) });
}
// Count number of entities fetched so far
entityCount += casObjectList.length;
// append entities to queue
workQueue.enqueue(casObjectList);
// If CollectionReader returns bundles instead of individual
// entity
// make the loop termines if exceeding total number of
// entities to
// process
if (entityCount > maxToProcess) {
break;
}
}
}
if (cpmStatTable != null) {
Progress[] progress = collectionReader.getProgress();
if (progress != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"process",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_show_cr_progress__FINEST",
new Object[] { Thread.currentThread().getName(),
String.valueOf(progress[0].getCompleted()) });
}
}
cpmStatTable.put("COLLECTION_READER_PROGRESS", progress);
}
} catch (Exception e) {
if (casObjectList == null) {
notifyListeners(null, e);
} else {
// Release CAS's back to the cas pool.
for (int i = 0; casObjectList != null && i < casObjectList.length; i++) {
if (casObjectList[i] != null && casObjectList[i] instanceof CAS) {
notifyListeners((CAS) casObjectList[i], e);
casPool.releaseCas(casList[i]);
casList[i] = null;
// synchronized (casPool) { // removed - redundant, because done as part of releaseCas
// casPool.notifyAll();
// }
} else {
notifyListeners(null, e);
}
}
}
throw e;
}
}
/**
* Reads next set of entities from the CollectionReader. This method may return more than one Cas
* at a time.
*
* @parma fetchSize - number of entities the CollectionReader should return for each fetch. It is
* hint as the Collection Reader ultimately decides how many to return.
*
* @return - The Object returned from the method depends on the type of the CollectionReader.
* Either CASData[] or CASObject[] initialized with document metadata and content is
* returned. If the CollectionReader has no more entities (EOF), null is returned.
*
* @throws IOException -
* error while reading corpus
* @throws CollectionException
*/
private Object[] readNext(int fetchSize) throws IOException, CollectionException {
ProcessTrace localTrace = new ProcessTrace_impl(cpm.getPerformanceTuningSettings());
boolean success = false;
Object[] casObjects = null;
threadState = 1000; // Entering hasNext()
// Checks if the CollectionReader has any documents left
long start = 0;
if (timer != null) {
start = timer.getTimeInMillis();
}
boolean eventStarted = false;
// CasObject based CollectionReader does not support returning more than
// one CAS at a time. So
// fake support for this by calling its getNext() until the casList is
// filled to max capacity.
// The capacity of casList is equal to the CollectionReader fetchSize,
// defined in CR descriptor.
if (collectionReader instanceof CollectionReader) {
casList = new CAS[fetchSize];
for (int i = 0; i < fetchSize; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_fetch_new_cas__FINEST",
new Object[] { Thread.currentThread().getName() });
}
threadState = 1001; // Waiting for CAS
// Get the cas from the pool.
while (cpm.isRunning() && (casList[i] = casPool.getCas(0)) == null)
; // intentionally empty while loop
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"process",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_cr_check_cas_for_null__FINEST",
new Object[] { Thread.currentThread().getName(),
String.valueOf((casList[i] == null)) });
}
if (cpm.isRunning() == false) {
// CPM is in shutdown stage. No need to enqueue additional
// documents/CAS'es. Just release
// those that have been aquired so far back to the pool and
// return null, indicating
// end of processing.
if (timer != null) {
totalFetchTime += (timer.getTimeInMillis() - start);
}
for (int listCounter = 0; casList != null && casList[i] != null
&& listCounter < casList.length; listCounter++) {
casPool.releaseCas(casList[listCounter]);
// synchronized (casPool) { // redundant - releaseCas call does this
// casPool.notifyAll();
// }
}
if (cpmStatTable != null) {
Progress[] progress = collectionReader.getProgress();
cpmStatTable.put("COLLECTION_READER_PROGRESS", progress);
cpmStatTable.put("COLLECTION_READER_TIME", Long.valueOf(totalFetchTime));
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_in_shutdown_state__FINEST",
new Object[] { Thread.currentThread().getName() });
}
return null;
}
if (casList[i] == null) {
return null;
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_got_new_cas__FINEST",
new Object[] { Thread.currentThread().getName() });
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_call_cas_reset__FINEST",
new Object[] { Thread.currentThread().getName() });
}
casList[i].reset();
// If Collection Reader and CAS Initilaizer do not declare any
// output SofAs, must be passed the default view (meaning whatever's
//mapped to _InitialView) for backward compatiblity
Capability[] capabilities;
CasInitializer casIni = ((CollectionReader) collectionReader).getCasInitializer();
if (casIni != null)
capabilities = casIni.getProcessingResourceMetaData().getCapabilities();
else
capabilities = ((CollectionReader) collectionReader).getProcessingResourceMetaData()
.getCapabilities();
boolean sofaUnaware = true;
for (int j = 0; j < capabilities.length; j++) {
if (capabilities[j].getOutputSofas().length > 0) {
sofaUnaware = false;
break;
}
}
threadState = 1003; // Entering
// set the current component info of the CAS, so that it knows
// the sofa
// mappings for the component that's about to process it
UimaContextAdmin context = ((CollectionReader) collectionReader).getUimaContextAdmin();
casList[i].setCurrentComponentInfo(context.getComponentInfo());
try {
if (sofaUnaware) {
// sofa-unaware CR, give it whatever is mapped to the
// initial view (creating that view first if it's not the default)
String absSofaName = context.getComponentInfo().mapToSofaID(CAS.NAME_DEFAULT_SOFA);
if (!CAS.NAME_DEFAULT_SOFA.equals(absSofaName)) {
casList[i].createView(CAS.NAME_DEFAULT_SOFA);
}
CAS view = casList[i].getView(CAS.NAME_DEFAULT_SOFA);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_call_cr_next__FINEST",
new Object[] { Thread.currentThread().getName(), "CAS" });
}
localTrace.startEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "");
eventStarted = true;
((CollectionReader) collectionReader).getNext(view);
localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "success");
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_call_cr_next_finished__FINEST",
new Object[] { Thread.currentThread().getName(), "CAS" });
}
} else
// sofa-aware CR, give it the base CAS
{
CAS baseCas = ((CASImpl) casList[i]).getBaseCAS();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_call_cr_next__FINEST",
new Object[] { Thread.currentThread().getName(), "CAS" });
}
localTrace.startEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "");
eventStarted = true;
((CollectionReader) collectionReader).getNext(baseCas);
localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "success");
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_call_cr_next_finished__FINEST",
new Object[] { Thread.currentThread().getName(), "CAS" });
}
}
success = true;
} finally {
// be sure to unset the component info in the CAS, since the
// CAS is no longer
// being processed by the CollectionReader
casList[i].setCurrentComponentInfo(null);
if (eventStarted) // use this to make sure we dont end event that has not been explicitely
// started
{
if (!success) {
localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "failure");
}
synchronized (globalSharedProcessTrace) {
globalSharedProcessTrace.aggregate(localTrace);
}
}
}
}
casObjects = casList;
if (casObjects != null && casObjects.length > 0) {
try {
if (((CASImpl) casList[0]).isBackwardCompatibleCas()) {
CAS view = casList[0].getView(CAS.NAME_DEFAULT_SOFA);
lastDocId[0] = ConsumerCasUtils.getStringFeatValue(view, Constants.METADATA_KEY,
Constants.DOC_ID);
} else {
lastDocId[0] = "";
}
} catch (Exception e) {
lastDocId[0] = "";
}
}
} else {
// Retrieve next set of CAS'es. fetchSize is hint to the
// CollectionReader how many to return
// Some CollectionReaders return a batch with size different than
// recommended in
// fetchSize. Most notably, Large WF Store decides itself how many
// entities to return for
// each fetch.
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_call_cr_next__FINEST",
new Object[] { Thread.currentThread().getName(), "CasData" });
}
localTrace.startEvent(collectionReader.getProcessingResourceMetaData().getName(), "Process",
"");
try {
casObjects = ((CasDataCollectionReader) collectionReader).getNext(fetchSize);
success = true;
} finally {
if (!success) {
localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "failure");
} else {
localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "success");
}
synchronized (globalSharedProcessTrace) {
globalSharedProcessTrace.aggregate(localTrace);
}
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_call_cr_next_finished__FINEST",
new Object[] { Thread.currentThread().getName(), "CasData" });
}
if (casObjects != null && casObjects.length > 0) {
lastDocId = DATACasUtils.getFeatureStructureValues((CasData) casObjects[0],
Constants.METADATA_KEY, Constants.DOC_ID);
}
}
if (timer != null) {
totalFetchTime += (timer.getTimeInMillis() - start);
}
if (cpmStatTable != null) {
Progress[] progress = collectionReader.getProgress();
cpmStatTable.put("COLLECTION_READER_PROGRESS", progress);
cpmStatTable.put("COLLECTION_READER_TIME", Long.valueOf(totalFetchTime));
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_return_cases_from_cr__FINEST",
new Object[] { Thread.currentThread().getName(), "CAS" });
}
return casObjects;
}
/**
* Runs this thread until the CPM halts or the CollectionReader has no more entities. It
* continuously fills the work queue with entities returned by the CollectionReader.
*/
public void run() {
boolean crEventCompleted = false; // this flag is used to mark the
// ProcessTrace event
if (!cpm.isRunning()) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cpm_not_running__WARNING",
new Object[] { Thread.currentThread().getName() });
return;
}
Object[] casObjectList = null;
// Check if
if (endOfProcessingReached()) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_done_producing__FINEST",
new Object[] { Thread.currentThread().getName() });
}
placeEOFToken();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_eof_marker_enqueued__FINEST",
new Object[] { Thread.currentThread().getName() });
}
return;
}
isRunning = true;
ProcessTrace localTrace = new ProcessTrace_impl(cpm.getPerformanceTuningSettings());
while (cpm.isRunning()) {
casList = null;
casObjectList = null;
synchronized (cpm.lockForPause) {
if (cpm.isPaused()) {
try {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_pausing_cr__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// Wait until resumed
cpm.lockForPause.wait();
} catch (Exception e) {
}
if (!cpm.isRunning()) {
break;
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_resume_cr__FINEST",
new Object[] { Thread.currentThread().getName() });
}
}
}
try {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_call_hasnext__FINEST",
new Object[] { Thread.currentThread().getName() });
}
threadState = 1004; // Entering hasNext()
// start the CR event
localTrace.startEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "");
crEventCompleted = false;
if (collectionReader.hasNext()) {
localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "success");
crEventCompleted = true;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_get_cas_from_cr__FINEST",
new Object[] { Thread.currentThread().getName() });
}
casObjectList = readNext(readerFetchSize);
if (casObjectList != null) {
if (casObjectList instanceof CAS[]) {
boolean releasedCas = false;
for (int i = 0; i < casObjectList.length && casObjectList[i] != null; i++) {
ChunkMetadata meta = CPMUtils.getChunkMetadata((CAS) casObjectList[i]);
if (meta != null) {
if (timedoutDocs.containsKey(meta.getDocId())) {
notifyListeners(casList[i], new ResourceProcessException(new SkipCasException(
"Dropping CAS due chunk Timeout. Doc Id::" + meta.getDocId()
+ " Sequence:" + meta.getSequence())));
casPool.releaseCas((CAS) casObjectList[i]);
// synchronized (casPool) { // redundant, releaseCas call does this
// casPool.notifyAll();
// }
releasedCas = true;
}
}
}
if (releasedCas) {
continue;
}
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"process",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_place_cas_in_queue__FINEST",
new Object[] { Thread.currentThread().getName(),
String.valueOf(casObjectList.length) });
}
// Prevent processing of new CASes if the CPM has been
// killed hard. Allow processing of CASes
// while the CPM is in normal shutdown state.
// (Moved this code inside if (casObjectList != null)
// block to avoid NullPointerException. -Adam
if (cpm.isRunning() == true
|| (cpm.isRunning() == false && cpm.isHardKilled() == false)) {
threadState = 1005; // Entering enqueue
workQueue.enqueue(casObjectList);
// synchronized (workQueue) { // redundant, enqueue does this
// workQueue.notifyAll();
// }
threadState = 1006; // Done Entering enqueue
entityCount += casObjectList.length;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"process",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_placed_cas_in_queue__FINEST",
new Object[] { Thread.currentThread().getName(),
String.valueOf(casObjectList.length) });
}
} else {
break; // CPM has been killed
}
} else {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_terminate_cr_thread__FINEST",
new Object[] { Thread.currentThread().getName() });
}
break; // Null should not be returned from getNext
// unless the CPM is in shutdown mode
}
} else {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_processed_all__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// Stops the CPM and all of the running threads.
// cpm.stopIt(); APL - don't stop, just terminate this
// thread, which CPMEngine has joined on
break;
}
// Check if the CollectionReader retrieved expected number of
// entities
if (endOfProcessingReached()) {
threadState = 1010; // End of processing
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_end_of_processing__FINEST",
new Object[] { Thread.currentThread().getName() });
}
break;
}
} catch (Exception e) {
// The following conditional is true if hasNext() has failed
if (!crEventCompleted) {
localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(),
"Process", "failure");
}
// e.printStackTrace();
if (UIMAFramework.getLogger().isLoggable(Level.FINER)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.FINER, e.getMessage(), e);
}
if (casList == null) {
notifyListeners(null, e);
} else {
// Notify Listeners and release CAS's back to the cas pool.
for (int i = 0; casList != null && i < casList.length; i++) {
if (casList[i] != null) {
notifyListeners(casList[i], e);
casPool.releaseCas(casList[i]);
casList[i] = null;
// synchronized (casPool) { // redundant, releaseCas does this
// casPool.notifyAll();
// }
} else {
notifyListeners(null, e);
}
casList = null;
}
}
} finally {
// Clear all events
synchronized (globalSharedProcessTrace) {
globalSharedProcessTrace.aggregate(localTrace);
}
localTrace.clear();
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_show_cpm_running_status__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(cpm.isRunning()) });
}
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_show_cpm_running_status__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(cpm.isRunning()) });
}
// Done with processing. Create a "special" EOF token and place it in
// the queue.
// Consumers of the queue must interpret this token as End Of File
// event, meaning
// end of processing. Such components must do appropriate cleanup and
// terminate.
placeEOFToken();
isRunning = false;
// Interrupt any waiting threads
Thread.currentThread().interrupt();
}
/**
* Notify registered callback listeners of a given exception.
*
* @param anException -
* exception to propagate to callback listeners
*/
private void notifyListeners(CAS aCas, Exception anException) {
for (int i = 0; callbackListeners != null && i < callbackListeners.size(); i++) {
StatusCallbackListener statCL = (StatusCallbackListener) callbackListeners.get(i);
ProcessTrace prTrace = new ProcessTrace_impl(cpm.getPerformanceTuningSettings());
EntityProcessStatusImpl aEntityProcStatus = new EntityProcessStatusImpl(prTrace);
aEntityProcStatus.addEventStatus("Collection Reader Failure", "failed", anException);
// Notify the listener that the Cas has been processed
CPMEngine.callEntityProcessCompleteWithCAS(statCL, aCas, aEntityProcStatus);
// statCL.entityProcessComplete(aCas, aEntityProcStatus);
}
}
/**
* Place terminating EOFToken into a Work Queue. Any thread reading this token from the queue is
* responsible for terminating itself.
*
*/
private void placeEOFToken() {
// Done with processing. Create a "special" EOF token and place it in
// the queue.
// Consumers of the queue must interpret this token as End Of File
// event, meaning
// end of processing. Such components must do appropriate cleanup and
// terminate.
try {
Object[] eofToken = new Object[1];
// only need one member in the array
eofToken[0] = new EOFToken();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_enqueue_eof_token__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(cpm.isRunning()) });
}
workQueue.enqueue(eofToken);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_done_enqueue_eof_token__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(cpm.isRunning()) });
}
// synchronized (workQueue) { // redundant, the enqueue call above does this
// workQueue.notifyAll();
// }
} catch (Exception e) {
e.printStackTrace();
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "", e);
}
}
}
public String getLastDocId() {
if (lastDocId != null && lastDocId.length > 0) {
return lastDocId[0];
} else {
return "N/A";
}
}
public void invalidate(CAS[] aCasList) {
for (int i = 0; aCasList != null && i < aCasList.length && aCasList[i] != null; i++) {
ChunkMetadata meta = CPMUtils.getChunkMetadata(aCasList[i]);
// Add the docId into a cache of documents that have been dropped
// due to exception
// during processing. This is only done for chunked documents
// (sequence > 0)
if (meta != null && meta.getSequence() > 0 && !timedoutDocs.containsKey(meta.getDocId())) {
timedoutDocs.put(meta.getDocId(), meta.getDocId());
}
}
}
}