| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.uima.collection.impl.cpm; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.apache.uima.UIMAFramework; |
| import org.apache.uima.UIMARuntimeException; |
| import org.apache.uima.collection.StatusCallbackListener; |
| import org.apache.uima.collection.base_cpm.AbortCPMException; |
| import org.apache.uima.collection.base_cpm.BaseCPM; |
| import org.apache.uima.collection.base_cpm.BaseCollectionReader; |
| import org.apache.uima.collection.base_cpm.BaseStatusCallbackListener; |
| import org.apache.uima.collection.base_cpm.CasProcessor; |
| import org.apache.uima.collection.base_cpm.RecoverableCollectionReader; |
| import org.apache.uima.collection.base_cpm.SynchPoint; |
| import org.apache.uima.collection.impl.EntityProcessStatusImpl; |
| import org.apache.uima.collection.impl.base_cpm.container.ProcessingContainer; |
| import org.apache.uima.collection.impl.cpm.container.CPEFactory; |
| import org.apache.uima.collection.impl.cpm.container.deployer.socket.ProcessControllerAdapter; |
| import org.apache.uima.collection.impl.cpm.engine.CPMEngine; |
| import org.apache.uima.collection.impl.cpm.engine.CPMThreadGroup; |
| import org.apache.uima.collection.impl.cpm.utils.CPMUtils; |
| import org.apache.uima.collection.impl.cpm.utils.CasMetaData; |
| import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage; |
| import org.apache.uima.collection.impl.cpm.utils.TimerFactory; |
| import org.apache.uima.collection.metadata.CpeConfiguration; |
| import org.apache.uima.collection.metadata.CpeDescription; |
| import org.apache.uima.resource.ResourceConfigurationException; |
| import org.apache.uima.resource.ResourceInitializationException; |
| import org.apache.uima.resource.ResourceManager; |
| import org.apache.uima.resource.metadata.NameValuePair; |
| import org.apache.uima.util.Level; |
| import org.apache.uima.util.ProcessTrace; |
| import org.apache.uima.util.ProcessTraceEvent; |
| import org.apache.uima.util.Progress; |
| import org.apache.uima.util.UimaTimer; |
| import org.apache.uima.util.impl.ProcessTrace_impl; |
| |
| |
| /** |
| * Main thread that launches CPE and manages it. An application interacts with the running CPE via |
| * this object. Through an API, an application may start, pause, resume, and stop a CPE. |
| * |
| * |
| */ |
| public class BaseCPMImpl implements BaseCPM, Runnable { |
| |
| /** The default process trace. */ |
| private boolean defaultProcessTrace; |
| |
| /** The cp engine. */ |
| private CPMEngine cpEngine = null; |
| |
| /** The proc tr. */ |
| private ProcessTrace procTr = null; |
| |
| /** The collection reader. */ |
| BaseCollectionReader collectionReader = null; |
| |
| /** The checkpoint. */ |
| private Checkpoint checkpoint = null; |
| |
| /** The checkpoint data. */ |
| private CheckpointData checkpointData = null; |
| |
| /** The num 2 process. */ |
| private long num2Process = -1; // Default ALL |
| |
| /** The killed. */ |
| private boolean killed = false; |
| |
| /** The completed. */ |
| private boolean completed = false; |
| |
| /** The cpe factory. */ |
| private CPEFactory cpeFactory = null; |
| |
| /** The use jedii report. */ |
| private boolean useJediiReport = true; |
| |
| /** The m event type map. */ |
| private Map mEventTypeMap; |
| |
| /** The cpm thread group. */ |
| public CPMThreadGroup cpmThreadGroup = null; |
| |
| /** |
| * Instantiates and initializes CPE Factory with a given CPE Descriptor and defaults. |
| * |
| * @param aDescriptor - |
| * parsed CPE descriptor |
| * @throws Exception - |
| */ |
| public BaseCPMImpl(CpeDescription aDescriptor) throws Exception { |
| this(aDescriptor, null, true, UIMAFramework.getDefaultPerformanceTuningProperties()); |
| cpmThreadGroup = new CPMThreadGroup("CPM Thread Group"); |
| } |
| |
| /** |
| * Instantiates and initializes CPE Factory responsible for creating individual components that |
| * are part of the processing pipeline. |
| * |
| * @param aDescriptor - |
| * parsed CPE descriptor |
| * @param aResourceManager - |
| * ResourceManager instance to be used by the CPE |
| * @param aDefaultProcessTrace - |
| * ProcessTrace instance to capture events and stats |
| * @param aProps the a props |
| * @throws Exception - |
| */ |
| public BaseCPMImpl(CpeDescription aDescriptor, ResourceManager aResourceManager, |
| boolean aDefaultProcessTrace, Properties aProps) throws Exception { |
| cpeFactory = new CPEFactory(aDescriptor, aResourceManager); |
| defaultProcessTrace = aDefaultProcessTrace; |
| cpmThreadGroup = new CPMThreadGroup("CPM Thread Group"); |
| init(false, aProps); |
| } |
| |
| /** |
| * Parses CPE descriptor. |
| * |
| * @param mode - |
| * indicates if the CPM should use a static descriptor or one provided |
| * @param aDescriptor - |
| * provided descriptor path |
| * @param aResourceManager ResourceManager to be used by CPM |
| * @throws Exception - |
| */ |
| public BaseCPMImpl(Boolean mode, String aDescriptor, ResourceManager aResourceManager) |
| throws Exception { |
| cpmThreadGroup = new CPMThreadGroup("CPM Thread Group"); |
| cpeFactory = new CPEFactory(aResourceManager); |
| if (mode == null) { |
| defaultProcessTrace = true; |
| cpeFactory.parse(); |
| } else { |
| defaultProcessTrace = mode; |
| cpeFactory.parse(aDescriptor); |
| } |
| init(mode == null, UIMAFramework.getDefaultPerformanceTuningProperties()); |
| } |
| |
| /** |
| * Plugs in custom perfomance tunning parameters. |
| * |
| * @param aPerformanceTuningSettings the new performance tuning settings |
| */ |
| public void setPerformanceTuningSettings(Properties aPerformanceTuningSettings) { |
| cpEngine.setPerformanceTuningSettings(aPerformanceTuningSettings); |
| } |
| |
| /** |
| * Plugs in a given {@link ProcessControllerAdapter}. The CPM uses this adapter to request Cas |
| * Processor restarts and shutdown. |
| * |
| * @param aPca - |
| * instance of the ProcessControllerAdapter |
| */ |
| public void setProcessControllerAdapter(ProcessControllerAdapter aPca) { |
| cpEngine.setProcessControllerAdapter(aPca); |
| } |
| |
| /** |
| * Sets Jedii-style reporting resources and sets the global flag to indicate what report-style to |
| * use at the end of processing. Jedii-style reporting shows a summary for this run. The CPM |
| * default report shows more detail information. |
| * |
| * @param aUseJediiReport the new jedii report |
| */ |
| public void setJediiReport(boolean aUseJediiReport) { |
| mEventTypeMap = new HashMap(); |
| mEventTypeMap.put(ProcessTraceEvent.ANALYSIS_ENGINE, "TAE"); |
| mEventTypeMap.put(ProcessTraceEvent.ANALYSIS, "Annotator"); |
| mEventTypeMap.put("CAS_PROCESSOR", "CAS Consumer"); |
| useJediiReport = aUseJediiReport; |
| } |
| |
| /** |
| * Instantiates and initializes a CPE. |
| * |
| * @param aDummyCasProcessor - |
| * @param aProps the a props |
| * @throws Exception - |
| */ |
| public void init(boolean aDummyCasProcessor, Properties aProps) throws Exception { |
| String uimaTimerClass = cpeFactory.getCPEConfig().getTimerImpl(); |
| try { |
| new TimerFactory(uimaTimerClass); |
| } catch (Exception e) { |
| // e.printStackTrace(); |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(), |
| "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_use_default_timer__FINEST", |
| new Object[] { Thread.currentThread().getName() }); |
| |
| } |
| } |
| UimaTimer uimaTimer = TimerFactory.getTimer(); |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(), |
| "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_use_custom_timer__FINEST", |
| new Object[] { Thread.currentThread().getName(), uimaTimer.getClass().getName() }); |
| } |
| |
| procTr = new ProcessTrace_impl(uimaTimer, aProps); |
| String checkpointFileName = null; |
| if (cpeFactory.getCPEConfig().getCheckpoint() != null |
| && cpeFactory.getCPEConfig().getCheckpoint().getFilePath() != null) { |
| // Retrieve from CPM configuration a name of the checkpoint file where the |
| // CPM's runtime stats be deposited. |
| checkpointFileName = cpeFactory.getCPEConfig().getCheckpoint().getFilePath(); |
| } |
| if (checkpointFileName != null && checkpointFileName.trim().length() > 0) { |
| File checkpointFile = new File(checkpointFileName); |
| checkpoint = new Checkpoint(this, checkpointFileName, cpeFactory.getCPEConfig() |
| .getCheckpoint().getFrequency()); |
| // Check if the checkpoint file already exists. If it does, the CPM did not complete |
| // successfully during the previous run and CPM will start in recovery mode, restoring all |
| // totals and status's from the recovered checkpoint. The processing pipeline state will |
| // restored to the state as of before foreced shutdown. All CasProcessors that were disabled |
| // during that run will remain disabled. The CollectionReader will be advanced to the |
| // entity last processed by the previous CPM. |
| if (checkpointFile.exists()) { |
| try { |
| Object restoredObject = checkpoint.restoreFromCheckpoint(); |
| if (restoredObject != null && restoredObject instanceof CheckpointData) { |
| checkpointData = (CheckpointData) restoredObject; |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| // Instantiate class responsible for processing |
| cpEngine = new CPMEngine(cpmThreadGroup, cpeFactory, procTr, checkpointData); |
| if (!aDummyCasProcessor) { |
| int concurrentThreadCount = cpeFactory.getCpeDescriptor().getCpeCasProcessors() |
| .getConcurrentPUCount(); |
| for (int threadCount = 0; threadCount < concurrentThreadCount; threadCount++) { |
| CasProcessor[] casProcessors = cpeFactory.getCasProcessors(); |
| for (int i = 0; i < casProcessors.length; i++) { |
| if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) { |
| UIMAFramework.getLogger(this.getClass()).logrb( |
| Level.CONFIG, |
| this.getClass().getName(), |
| "process", |
| CPMUtils.CPM_LOG_RESOURCE_BUNDLE, |
| "UIMA_CPM_add_cp__CONFIG", |
| new Object[] { Thread.currentThread().getName(), |
| casProcessors[i].getProcessingResourceMetaData().getName() }); |
| } |
| addCasProcessor(casProcessors[i]); |
| } |
| } |
| } |
| int casPoolSize = 0; |
| try { |
| casPoolSize = cpeFactory.getCpeDescriptor().getCpeCasProcessors().getCasPoolSize(); |
| casPoolSize = (casPoolSize == -1) ? 0 : casPoolSize; |
| cpEngine.setPoolSize(casPoolSize); |
| } catch (NumberFormatException e) { |
| // PoolSize is currently an optional parameter. If it does not exist in CPE descriptor a |
| // default |
| // value will be derived from defined queue sizes |
| } |
| |
| try { |
| int iqSize = 0; |
| if (casPoolSize == 0) { |
| iqSize = cpeFactory.getCpeDescriptor().getCpeCasProcessors().getInputQueueSize(); |
| } |
| cpEngine.setInputQueueSize(casPoolSize == 0 ? iqSize : casPoolSize); |
| } catch (NumberFormatException e) { |
| throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE, |
| "UIMA_CPM_EXP_queue_size_not_defined__WARNING", new Object[] { |
| Thread.currentThread().getName(), "inputQueueSize" })); |
| } |
| try { |
| int oqSize = 0; |
| if (casPoolSize == 0) { |
| oqSize = cpeFactory.getCpeDescriptor().getCpeCasProcessors().getOutputQueueSize(); |
| } |
| cpEngine.setOutputQueueSize(casPoolSize == 0 ? oqSize : casPoolSize + 2); |
| } catch (NumberFormatException e) { |
| throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE, |
| "UIMA_CPM_EXP_queue_size_not_defined__WARNING", new Object[] { |
| Thread.currentThread().getName(), "outputQueueSize" })); |
| } |
| try { |
| int threadCount = cpeFactory.getCpeDescriptor().getCpeCasProcessors().getConcurrentPUCount(); |
| cpEngine.setConcurrentThreadSize(threadCount); |
| } catch (NumberFormatException e) { |
| throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE, |
| "UIMA_CPM_EXP_invalid_component_reference__WARNING", new Object[] { |
| Thread.currentThread().getName(), "casProcessors", "processingUnitThreadCount" })); |
| } |
| } |
| |
| /** |
| * Returns {@link CPEConfig} object holding current CPE configuration. |
| * |
| * @return CPEConfig instance |
| * @throws Exception - |
| */ |
| public CpeConfiguration getCPEConfig() throws Exception { |
| return cpeFactory.getCPEConfig(); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#getCasProcessors() |
| */ |
| /* |
| * Returns All CasProcessors currently in the processing pipeline |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#getCasProcessors() |
| */ |
| @Override |
| public CasProcessor[] getCasProcessors() { |
| CasProcessor[] casProcs = cpEngine.getCasProcessors(); |
| return casProcs == null ? new CasProcessor[0] : casProcs; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor) |
| */ |
| /* |
| * Adds given CasProcessor to the processing pipeline. A new CasProcessor is appended to the |
| * current list. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor) |
| */ |
| @Override |
| public void addCasProcessor(CasProcessor aCasProcessor) throws ResourceConfigurationException { |
| cpEngine.addCasProcessor(aCasProcessor); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor, int) |
| */ |
| /* |
| * Adds given CasProcessor to the processing pipeline. A new CasProcessor is inserted into a given |
| * spot in the current list. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor, |
| * int) |
| */ |
| @Override |
| public void addCasProcessor(CasProcessor aCasProcessor, int aIndex) |
| throws ResourceConfigurationException { |
| cpEngine.addCasProcessor(aCasProcessor, aIndex); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor) |
| */ |
| /* |
| * Removes given CasProcessor from the processing pipeline. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor) |
| */ |
| @Override |
| public void removeCasProcessor(CasProcessor aCasProcessor) { |
| cpEngine.removeCasProcessor(0); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(java.lang.String) |
| */ |
| /* |
| * Disables given CasProcessor in the existing processing pipeline. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor) |
| */ |
| @Override |
| public void disableCasProcessor(String aCasProcessorName) { |
| |
| cpEngine.disableCasProcessor(aCasProcessorName); |
| } |
| |
| /** |
| * Enable cas processor. |
| * |
| * @param aCasProcessorName the a cas processor name |
| */ |
| /* |
| * Disables given CasProcessor in the existing processing pipeline. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor) |
| */ |
| public void enableCasProcessor(String aCasProcessorName) { |
| |
| cpEngine.enableCasProcessor(aCasProcessorName); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#isSerialProcessingRequired() |
| */ |
| /* |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#isSerialProcessingRequired() |
| */ |
| @Override |
| public boolean isSerialProcessingRequired() { |
| return false; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#setSerialProcessingRequired(boolean) |
| */ |
| @Override |
| public void setSerialProcessingRequired(boolean aRequired) { |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#isPauseOnException() |
| */ |
| /* |
| * Returns true if this cpEngine pauses on exception. False otherwise. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#isPauseOnException() |
| */ |
| @Override |
| public boolean isPauseOnException() { |
| return cpEngine.isPauseOnException(); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#setPauseOnException(boolean) |
| */ |
| /* |
| * Defines if cpEngine should pause on exception |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#setPauseOnException(boolean) |
| */ |
| @Override |
| public void setPauseOnException(boolean aPause) { |
| cpEngine.setPauseOnException(aPause); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener) |
| */ |
| /* |
| * Adds Event Listener. Important events like, end of entity processing, exceptions, etc will be |
| * sent to the registered listeners. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener) |
| */ |
| @Override |
| public void addStatusCallbackListener(BaseStatusCallbackListener aListener) { |
| cpEngine.addStatusCallbackListener(aListener); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener) |
| */ |
| /* |
| * Remoces named listener from the listener list. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener) |
| */ |
| @Override |
| public void removeStatusCallbackListener(BaseStatusCallbackListener aListener) { |
| cpEngine.removeStatusCallbackListener(aListener); |
| } |
| |
| /* (non-Javadoc) |
| * @see java.lang.Runnable#run() |
| */ |
| /* |
| * Starting point for the CPE. Before starting processing, the CPE must deploy all CasProcessors. |
| * Once all are deployed the processing begins in Worker Thread called CPEngine. This thread |
| * blocks until the CPEngine thread completes. CPMWorker Thread finishes. |
| * |
| * @see java.lang.Runnable#run() |
| */ |
| @Override |
| public void run() { |
| long start, end; |
| // name this thread |
| Thread.currentThread().setName("BaseCPMImpl-Thread"); |
| |
| start = System.currentTimeMillis(); |
| if (!useJediiReport) { |
| procTr.startEvent("CPM", "CPM PROCESSING TIME", ""); |
| } |
| |
| // Specify how docs to process |
| cpEngine.setNumToProcess(num2Process); |
| try { |
| // Deploy all CAS processors |
| cpEngine.deployCasProcessors(); |
| cpEngine.setCollectionReader(collectionReader); |
| |
| // Start the checkpoint thread |
| if (checkpoint != null) { |
| new Thread(checkpoint).start(); |
| } |
| cpEngine.start(); |
| // Joing the CPMWorker Thread and wait until it finishes |
| cpEngine.join(); |
| |
| completed = true; |
| // If the entire collection has been processed there is no need for a checkpoint. |
| // Delete it, there is no need to recover anything. Otherwise, the CPM has been killed |
| // and may need to be recovered. Checkpoint file contains the status of the CPM, including |
| // last document processed, status of all CasProcessors along with all counts and totals. |
| if (!killed && checkpoint != null) { |
| checkpoint.stop(); |
| checkpoint.delete(); |
| checkpoint = null; |
| } |
| // Terminate all threads and running services |
| cpEngine.stopCasProcessors(false); |
| // Notify Listeners that the processing pipeline has finished |
| if (!useJediiReport) { |
| procTr.endEvent("CPM", "CPM PROCESSING TIME", "success"); |
| } |
| end = System.currentTimeMillis(); |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(), |
| "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, |
| "UIMA_CPM_show_total_time_in_cpm__FINEST", |
| new Object[] { Thread.currentThread().getName(), String.valueOf(end - start) }); |
| } |
| } catch (AbortCPMException e) { |
| if (!useJediiReport) { |
| procTr.endEvent("CPM", "CPM PROCESSING TIME", "failed"); |
| } |
| // Terminate all threads and running services |
| try { |
| cpEngine.stopCasProcessors(true); |
| } catch (Exception ex) { |
| UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "" + ex); |
| } |
| killed = true; |
| } catch (Exception e) { |
| if (!useJediiReport) { |
| procTr.endEvent("CPM", "CPM PROCESSING TIME", "failed"); |
| } |
| UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "" + e); |
| killed = true; |
| ArrayList statusCbL = cpEngine.getCallbackListeners(); |
| EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(procTr, true); |
| // e is the actual exception. |
| enProcSt.addEventStatus("CPM", "Failed", e); |
| |
| // Notify all listeners that the CPM has finished processing |
| for (int j = 0; j < statusCbL.size(); j++) { |
| BaseStatusCallbackListener st = (BaseStatusCallbackListener) statusCbL.get(j); |
| if (st != null && st instanceof StatusCallbackListener) { |
| ((StatusCallbackListener) st).entityProcessComplete(null, enProcSt); |
| } |
| } |
| } |
| |
| if (cpEngine.isKilled()) { |
| killed = true; |
| } |
| |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(), |
| "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cpm_stopped__FINEST", |
| new Object[] { Thread.currentThread().getName(), String.valueOf(killed) }); |
| } |
| ArrayList statusCbL = cpEngine.getCallbackListeners(); |
| // Notify all listeners that the CPM has finished processing |
| for (int j = 0; j < statusCbL.size(); j++) { |
| BaseStatusCallbackListener st = (BaseStatusCallbackListener) statusCbL.get(j); |
| if ( st != null ) { |
| if (!killed) { |
| st.collectionProcessComplete(); |
| } else { |
| st.aborted(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Called to cleanup CPE on shutdown. |
| */ |
| public void finalizeIt() { |
| // Do cleanup before terminating self |
| cpEngine.cleanup(); |
| |
| } |
| |
| /** |
| * This method is called by an applications to begin CPM processing with a given Collection. It |
| * just creates a new thread and starts it. |
| * |
| * @param aCollectionReader the a collection reader |
| * @throws ResourceInitializationException the resource initialization exception |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#process() |
| * @deprecated |
| */ |
| @Deprecated |
| public void process(BaseCollectionReader aCollectionReader) |
| throws ResourceInitializationException { |
| // Retrieve number of entities to process from the CPM configuration |
| try { |
| num2Process = cpeFactory.getCPEConfig().getNumToProcess(); |
| } catch (InstantiationException e) { |
| throw new ResourceInitializationException(e); |
| } |
| collectionReader = aCollectionReader; |
| if (cpeFactory.isDefault()) { |
| cpeFactory.addCollectionReader(collectionReader); |
| } |
| cpmThreadGroup.setProcessTrace(procTr); |
| cpmThreadGroup.setListeners(cpEngine.getCallbackListeners()); |
| new Thread(this).start(); |
| } |
| |
| /** |
| * This method is called by an application to begin processing given Collection. It creates a new |
| * thread, adds it to a ThreadGroup and starts it. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#process() |
| */ |
| @Override |
| public void process() throws ResourceInitializationException { |
| // Retrieve number of entities to process from the CPM configuration |
| try { |
| num2Process = cpeFactory.getCPEConfig().getNumToProcess(); |
| if (collectionReader == null) { |
| collectionReader = cpeFactory.getCollectionReader(); |
| } |
| } catch (InstantiationException e) { |
| throw new ResourceInitializationException(e); |
| } catch (Exception e) { |
| throw new ResourceInitializationException(e); |
| } |
| if (cpeFactory.isDefault()) { |
| cpeFactory.addCollectionReader(collectionReader); |
| } |
| cpmThreadGroup.setProcessTrace(procTr); |
| cpmThreadGroup.setListeners(cpEngine.getCallbackListeners()); |
| |
| new Thread(this).start(); |
| } |
| |
| /** |
| * This method is called by an applications to begin CPM processing with a given Collection. It |
| * just creates a new thread and starts it. |
| * |
| * @param aCollectionReader the a collection reader |
| * @param aBatchSize the a batch size |
| * @throws ResourceInitializationException the resource initialization exception |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#process() |
| * @deprecated |
| */ |
| @Deprecated |
| public void process(BaseCollectionReader aCollectionReader, int aBatchSize) |
| throws ResourceInitializationException { |
| // Let the application define the size of Collection. |
| num2Process = aBatchSize; |
| collectionReader = aCollectionReader; |
| if (cpeFactory.isDefault()) { |
| cpeFactory.addCollectionReader(collectionReader); |
| } |
| cpmThreadGroup.setProcessTrace(procTr); |
| cpmThreadGroup.setListeners(cpEngine.getCallbackListeners()); |
| |
| new Thread(cpmThreadGroup, this).start(); |
| } |
| |
| /** |
| * Sets the Collection Reader for this CPE. |
| * |
| * @param aCollectionReader |
| * the collection reader |
| */ |
| @Override |
| public void setCollectionReader(BaseCollectionReader aCollectionReader) { |
| collectionReader = aCollectionReader; |
| if (cpeFactory.isDefault()) { |
| cpeFactory.addCollectionReader(collectionReader); |
| } |
| } |
| |
| /** |
| * Returns a Collection Reader for this CPE. |
| * |
| * @return the collection reader |
| */ |
| @Override |
| public BaseCollectionReader getCollectionReader() { |
| try { |
| if (this.collectionReader == null) { |
| this.collectionReader = this.cpeFactory.getCollectionReader(); |
| } |
| return this.collectionReader; |
| } catch (ResourceConfigurationException e) { |
| throw new UIMARuntimeException(e); |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#isProcessing() |
| */ |
| /* |
| * Returns current state of the CPM. |
| * |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#isProcessing() |
| */ |
| @Override |
| public boolean isProcessing() { |
| return cpEngine.isRunning(); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#pause() |
| */ |
| /* |
| * Pauses the CPM |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#pause() |
| */ |
| @Override |
| public void pause() { |
| cpEngine.pauseIt(); |
| if (checkpoint != null) { |
| checkpoint.doCheckpoint(); |
| checkpoint.pause(); |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#isPaused() |
| */ |
| /* |
| * Returns true if the CPM is in pause state |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#isPaused() |
| */ |
| @Override |
| public boolean isPaused() { |
| return cpEngine.isPaused(); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#resume(boolean) |
| */ |
| /* |
| * Resumes the CPM |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#resume(boolean) |
| */ |
| @Override |
| public void resume(boolean aRetryFailed) { |
| resume(); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#resume() |
| */ |
| /* |
| * Resumes the CPM |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#resume() |
| */ |
| @Override |
| public void resume() { |
| cpEngine.resumeIt(); |
| if (checkpoint != null) { |
| checkpoint.resume(); |
| } |
| } |
| |
| /** |
| * Kills the CPM hard. CASes in transit are not processed. |
| * |
| * |
| */ |
| public void kill() { |
| if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(), |
| "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_killing_cpm__WARNING", |
| new Object[] { Thread.currentThread().getName() }); |
| } |
| killed = true; |
| // Stop processing pipeline. The CPMWorker will finish processing of the current |
| // entity through the processing pipeline and than will terminate processing. |
| cpEngine.killIt(); |
| |
| // If valid checkpoint reference and not already deleted do the next checkpoint and stop |
| // the checkpoint thread. Checkpoint file may be deleted if the CPM successfully completes |
| // its run. See BaseCPM#run(). |
| if (checkpoint != null && !completed) { |
| checkpoint.doCheckpoint(); |
| checkpoint.stop(); |
| } |
| |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#stop() |
| */ |
| /* |
| * Stops the CPM and all of its processing components. The CPM finishes processing of all CASes |
| * that are in its queues. |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#stop() |
| */ |
| @Override |
| public void stop() { |
| if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(), |
| "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_stop_cpm__WARNING", |
| new Object[] { Thread.currentThread().getName() }); |
| } |
| killed = true; |
| // Stop processing pipeline. The CPMWorker will finish processing of the current |
| // entity through the processing pipeline and than will terminate processing. |
| cpEngine.stopIt(); |
| |
| // If valid checkpoint reference and not already deleted do the next checkpoint and stop |
| // the checkpoint thread. Checkpoint file may be deleted if the CPM successfully completes |
| // its run. See BaseCPM#run(). |
| if (checkpoint != null && !completed) { |
| checkpoint.doCheckpoint(); |
| checkpoint.stop(); |
| } |
| } |
| |
| /** |
| * Asynch stop. |
| */ |
| /* |
| * Stops/kills the CPM and all of its processing components. |
| * |
| * @deprecated |
| * |
| * @see org.apache.uima.collection.base_cpm.BaseCPM#stop() |
| */ |
| public void asynchStop() { |
| if (UIMAFramework.getLogger().isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(), |
| "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_asynch_stop_cpm__WARNING", |
| new Object[] { Thread.currentThread().getName() }); |
| } |
| killed = true; |
| // Stop processing pipeline. The CPMWorker will finish processing of the current |
| // entity through the processing pipeline and than will terminate processing. |
| cpEngine.asynchStop(); |
| |
| // If valid checkpoint reference and not already deleted do the next checkpoint and stop |
| // the checkpoint thread. Checkpoint file may be deleted if the CPM successfully completes |
| // its run. See BaseCPM#run(). |
| if (checkpoint != null && !completed) { |
| checkpoint.doCheckpoint(); |
| checkpoint.stop(); |
| } |
| } |
| |
| /** |
| * Decode status. |
| * |
| * @param aStatus the a status |
| * @return the string |
| */ |
| /* |
| * Returns a String describing a given CASProcessor state. |
| * |
| * @param aStatus - status of the CasProcessor @return - String corresponding to a given state of |
| * the CasProcessor |
| */ |
| private String decodeStatus(int aStatus) { |
| try { |
| switch (aStatus) { |
| case Constants.CAS_PROCESSOR_COMPLETED: |
| return Constants.COMPLETED; |
| case Constants.CAS_PROCESSOR_DISABLED: |
| return Constants.DISABLED; |
| case Constants.CAS_PROCESSOR_READY: |
| return Constants.READY; |
| case Constants.CAS_PROCESSOR_RUNNING: |
| return Constants.RUNNING; |
| case Constants.CAS_PROCESSOR_KILLED: |
| return Constants.KILLED; |
| } |
| } catch (NumberFormatException e) { |
| e.printStackTrace(); |
| } |
| return Constants.UNKNOWN; |
| } |
| |
| /** |
| * Copy component events. |
| * |
| * @param aEvType the a ev type |
| * @param aList the a list |
| * @param aPTr the a P tr |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| /* |
| * Copies events of a given type found in the list to a provided ProcessTrace instance |
| * |
| * @param - aEvType, event type to copy from the list @param - List, list of events @param |
| * ProcessTrace, where to copy events of a given type |
| * |
| */ |
| private void copyComponentEvents(String aEvType, List aList, ProcessTrace aPTr) |
| throws IOException { |
| for (int i = 0; i < aList.size(); i++) { |
| ProcessTraceEvent prEvent = (ProcessTraceEvent) aList.get(i); |
| if (aEvType != null && aEvType.equals(prEvent.getType())) { |
| aPTr.addEvent(prEvent); |
| } |
| } |
| |
| } |
| |
| /** |
| * Helper method to display stats and totals. |
| * |
| * @param aProcessTrace - |
| * trace containing stats |
| * @param aNumDocsProcessed - |
| * number of entities processed so far |
| */ |
| public void displayStats(ProcessTrace aProcessTrace, int aNumDocsProcessed) { |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, |
| "Documents Processed: " + aNumDocsProcessed); |
| } |
| // count total time |
| int totalTime = 0; |
| Iterator it = aProcessTrace.getEvents().iterator(); |
| while (it.hasNext()) { |
| ProcessTraceEvent event = (ProcessTraceEvent) it.next(); |
| // Dont add total time the CPM ran for. Just add all of the times of all components to |
| // get the time. |
| if ("CPM".equals(event.getComponentName())) { |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).log( |
| Level.FINEST, |
| "Current Component::" + event.getComponentName() + " Time::" |
| + event.getDuration()); |
| } |
| continue; |
| } |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, |
| "Current Component::" + event.getComponentName()); |
| } |
| totalTime += event.getDuration(); |
| } |
| float totalTimeSeconds = (float) totalTime / 1000; |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, |
| "Total Time: " + totalTimeSeconds + " seconds"); |
| } |
| |
| // create root tree node |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, |
| "100% (" + totalTime + "ms) - Collection Processing Engine"); |
| } |
| // build tree |
| it = aProcessTrace.getEvents().iterator(); |
| while (it.hasNext()) { |
| ProcessTraceEvent event = (ProcessTraceEvent) it.next(); |
| buildEventTree(event, totalTime); |
| } |
| } |
| |
| /** |
| * Helper method to help build the CPM report. |
| * |
| * @param aEvent the a event |
| * @param aTotalTime the a total time |
| */ |
| public void buildEventTree(ProcessTraceEvent aEvent, int aTotalTime) { |
| // Skip reporting the CPM time.This time has already been acquired by summing up |
| // times from all individual components |
| if ("CPM".equals(aEvent.getComponentName())) { |
| return; |
| } |
| |
| int duration = aEvent.getDuration(); |
| float pct = (float) ((duration * 100 * 10) / aTotalTime) / 10; |
| |
| String type = (String) mEventTypeMap.get(aEvent.getType()); |
| if (type == null) { |
| type = aEvent.getType(); |
| } |
| |
| if (System.getProperty("DEBUG") != null) { |
| UIMAFramework.getLogger(this.getClass()).log( |
| Level.FINEST, |
| "" + pct + "% (" + duration + "ms) - " + aEvent.getComponentName() + " (" + type |
| + ")"); |
| } |
| Iterator it = aEvent.getSubEvents().iterator(); |
| while (it.hasNext()) { |
| ProcessTraceEvent event = (ProcessTraceEvent) it.next(); |
| buildEventTree(event, aTotalTime); |
| } |
| } |
| |
| /** |
| * Returns PerformanceReport for the CPM. This report contains a snapshot of the CPM state. |
| * |
| * @return the performance report |
| */ |
| @Override |
| public ProcessTrace getPerformanceReport() { |
| Map perfReport = cpEngine.getStats(); |
| Progress[] colReaderProgress = (Progress[]) perfReport.get("COLLECTION_READER_PROGRESS"); |
| |
| ProcessTrace processTrace = new ProcessTrace_impl(cpEngine.getPerformanceTuningSettings()); |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, |
| "-------------------------------------------"); |
| } |
| if (useJediiReport) { |
| try { |
| synchronized (procTr) { |
| List eventList = procTr.getEvents(); |
| |
| for (int j = 0; eventList != null && j < eventList.size(); j++) { |
| ProcessTraceEvent prEvent = (ProcessTraceEvent) eventList.get(j); |
| processTrace.addEvent(prEvent); |
| } |
| } |
| return processTrace; |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| if (defaultProcessTrace) { |
| createDefaultProcessTrace(getCasProcessors(), procTr, processTrace); |
| |
| return processTrace; |
| } |
| try { |
| |
| // To facilitate recovery from CPM untimely shutdown ( due to external STOP), it must |
| // have access to last entity id. CollectionReader will use this marker to synch itself |
| // up to the last known entity before the CPM shut itself down. More complicated |
| // recovery mechanism may be supported as long as CAS METADATA contains information |
| // about the last known point. For example, in case WF Large Store the cas must |
| // hold the entire EDATA frame, containing restart information. Its up to the |
| // CollectionReader to know what part of the CAS Metadata should be used for recovery. |
| // The "last cas" CasMetaData is added after each successfull read from the CollectionReader |
| // by the cpEngine in its run() processing loop. |
| CasMetaData casMetaData = (CasMetaData) perfReport.get("CPM_LAST_CAS_METADATA"); |
| if (casMetaData != null) { |
| NameValuePair[] nvp = casMetaData.getCasMetaData(); |
| StringBuffer sb = new StringBuffer(); |
| for (int i = 0; i < nvp.length && nvp[i] != null; i++) { |
| if (i != 0) { |
| // Add separator between name-value pairs. StringTokenizer will parse this string |
| // and will use the separator to extract nvp. |
| sb.append(","); |
| } |
| sb.append(nvp[i].getName() + "=" + (String) nvp[i].getValue()); |
| } |
| processTrace.addEvent("CPM", "CPM_LAST_CAS_METADATA", sb.toString(), 0, null); |
| } |
| List eList = null; |
| synchronized (procTr) { |
| eList = procTr.getEventsByComponentName("CPM", true); |
| } |
| if (!useJediiReport) { |
| copyComponentEvents("CPM PROCESSING TIME", eList, processTrace); |
| } |
| eList.clear(); |
| if (colReaderProgress != null) { |
| Long totalCollectionReaderTime = (Long) perfReport.get("COLLECTION_READER_TIME"); |
| String readerName = collectionReader.getProcessingResourceMetaData().getName(); |
| if (totalCollectionReaderTime != null) { |
| processTrace.addEvent(readerName, "COLLECTION_READER_TIME", String |
| .valueOf(totalCollectionReaderTime), 0, null); |
| } |
| for (int i = 0; i < colReaderProgress.length; i++) { |
| if (Progress.BYTES.equals(colReaderProgress[i].getUnit())) { |
| processTrace.addEvent(readerName, Constants.COLLECTION_READER_BYTES_PROCESSED, String |
| .valueOf(colReaderProgress[i].getCompleted()), 0, null); |
| } else if (Progress.ENTITIES.equals(colReaderProgress[i].getUnit())) { |
| processTrace.addEvent(readerName, Constants.COLLECTION_READER_DOCS_PROCESSED, String |
| .valueOf(colReaderProgress[i].getCompleted()), 0, null); |
| } |
| } |
| |
| synchronized (procTr) { |
| eList = procTr.getEventsByComponentName(readerName, true); |
| } |
| copyComponentEvents("COLLECTION READER PROCESSING TIME", eList, processTrace); |
| eList.clear(); |
| processTrace.addEvent(readerName, "Last Entity ID Read", cpEngine.getLastProcessedDocId(), |
| 0, null); |
| } |
| |
| LinkedList processors = cpEngine.getAllProcessingContainers(); |
| for (int i = 0; i < processors.size(); i++) { |
| ProcessingContainer container = (ProcessingContainer) processors.get(i); |
| synchronized (procTr) { |
| eList = procTr.getEventsByComponentName(container.getName(), true); |
| } |
| copyComponentEvents("Process", eList, processTrace); |
| |
| processTrace.addEvent(container.getName(), "Documents Processed", String.valueOf(container |
| .getProcessed()), 0, null); |
| String status = decodeStatus(container.getStatus()); |
| processTrace.addEvent(container.getName(), "Processor Status", status, 0, null); |
| |
| long bytesIn = container.getBytesIn(); |
| processTrace.addEvent(container.getName(), "Processor BYTESIN", String.valueOf(bytesIn), 0, |
| null); |
| |
| long bytesOut = container.getBytesOut(); |
| processTrace.addEvent(container.getName(), "Processor BYTESOUT", String.valueOf(bytesOut), |
| 0, null); |
| |
| int restartCount = container.getRestartCount(); |
| processTrace.addEvent(container.getName(), "Processor Restarts", String |
| .valueOf(restartCount), 0, null); |
| |
| int retryCount = container.getRetryCount(); |
| processTrace.addEvent(container.getName(), "Processor Retries", String.valueOf(retryCount), |
| 0, null); |
| |
| int filteredCount = container.getFilteredCount(); |
| processTrace.addEvent(container.getName(), "Filtered Entities", String |
| .valueOf(filteredCount), 0, null); |
| |
| long remainingCount = container.getRemaining(); |
| processTrace.addEvent(container.getName(), "Processor Remaining", String |
| .valueOf(remainingCount), 0, null); |
| |
| HashMap aMap = container.getAllStats(); |
| |
| if (aMap.keySet() != null) { |
| if (System.getProperty("SHOW_CUSTOM_STATS") != null) { |
| UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, "Adding Custom Stats"); |
| } |
| Iterator it = aMap.keySet().iterator(); |
| while (it != null && it.hasNext()) { |
| |
| String key = (String) it.next(); |
| if (key != null) { |
| Object o = aMap.get(key); |
| if (o instanceof String) { |
| processTrace.addEvent(container.getName(), key, (String) o, 0, null); |
| if (System.getProperty("SHOW_CUSTOM_STATS") != null) { |
| UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, |
| "Custom String Stat-" + key + " Value=" + (String) o); |
| } |
| } else if (o instanceof Integer) { |
| processTrace.addEvent(container.getName(), key, String.valueOf(((Integer) o) |
| .intValue()), 0, null); |
| if (System.getProperty("SHOW_CUSTOM_STATS") != null) { |
| UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, |
| "Custom Integer Stat-" + key + " Value=" + o); |
| } |
| } else { |
| if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(this.getClass()).log( |
| Level.FINEST, |
| "Invalid Type Found When Generating Status For " + key + ". Type::" |
| + o.getClass().getName() |
| + " Not supported. Use Integer or String instead."); |
| } |
| } |
| } |
| } |
| } |
| try { |
| String lastDocId = container.getLastProcessedEntityId(); |
| if (lastDocId != null) { |
| processTrace.addEvent(container.getName(), "Processor Last EntityId", lastDocId, 0, |
| null); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| return processTrace; |
| } |
| |
| /** |
| * Creates the default process trace. |
| * |
| * @param aProcessors the a processors |
| * @param srcProcTr the src proc tr |
| * @param aProcessTrace the a process trace |
| */ |
| private void createDefaultProcessTrace(CasProcessor[] aProcessors, ProcessTrace srcProcTr, |
| ProcessTrace aProcessTrace) { |
| for (int i = 0; aProcessors != null && i < aProcessors.length; i++) { |
| String name = aProcessors[i].getProcessingResourceMetaData().getName(); |
| if (name == null) { |
| name = aProcessors[i].getClass().getName(); |
| } |
| synchronized (srcProcTr) { |
| |
| List eventList = srcProcTr.getEventsByComponentName(name, false); |
| for (int j = 0; j < eventList.size(); j++) { |
| ProcessTraceEvent prEvent = (ProcessTraceEvent) eventList.get(j); |
| aProcessTrace.addEvent(prEvent); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns current CPE progress. How many entities processed and bytes processed. |
| * |
| * @return the progress |
| */ |
| @Override |
| public Progress[] getProgress() { |
| return cpEngine.getProgress(); |
| } |
| |
| /** |
| * Returns a CPE descriptor as a String. |
| * |
| * @param aList - |
| * list of components |
| * @return - descriptor populated with a given components |
| * @throws ResourceConfigurationException the resource configuration exception |
| */ |
| public String getDescriptor(List aList) throws ResourceConfigurationException { |
| return cpeFactory.getDescriptor(aList); |
| } |
| |
| /** |
| * Returns a {@link SynchPoint} object initialized by the Collection Reader if the Collection |
| * Reader implements {@link RecoverableCollectionReader}. The synchpoint object contains the |
| * current snapshot that includes the last document processed. |
| * |
| * @return - instance of SynchPoint if the Collection Reader is recoverable, null otherwise |
| */ |
| public SynchPoint getSynchPoint() { |
| SynchPoint synchPoint = null; |
| // Check if the CR is recoverable |
| if (collectionReader != null && collectionReader instanceof RecoverableCollectionReader) { |
| synchPoint = ((RecoverableCollectionReader) collectionReader).getSynchPoint(); |
| } |
| return synchPoint; |
| } |
| } |