blob: 1ba8ce4f992e7de438b63a1446e68eb4e2090eb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.fit.cpe;
import static org.apache.uima.UIMAFramework.getResourceSpecifierFactory;
import static org.apache.uima.UIMAFramework.produceCollectionProcessingEngine;
import static org.apache.uima.collection.impl.metadata.cpe.CpeDescriptorFactory.produceCasProcessor;
import static org.apache.uima.collection.impl.metadata.cpe.CpeDescriptorFactory.produceCollectionReader;
import static org.apache.uima.collection.impl.metadata.cpe.CpeDescriptorFactory.produceDescriptor;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.util.Map;
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.analysis_engine.metadata.FixedFlow;
import org.apache.uima.collection.CollectionProcessingEngine;
import org.apache.uima.collection.CollectionReaderDescription;
import org.apache.uima.collection.StatusCallbackListener;
import org.apache.uima.collection.impl.metadata.CpeDefaultValues;
import org.apache.uima.collection.metadata.CpeCollectionReader;
import org.apache.uima.collection.metadata.CpeComponentDescriptor;
import org.apache.uima.collection.metadata.CpeDescription;
import org.apache.uima.collection.metadata.CpeDescriptorException;
import org.apache.uima.collection.metadata.CpeInclude;
import org.apache.uima.collection.metadata.CpeIntegratedCasProcessor;
import org.apache.uima.fit.internal.ResourceManagerFactory;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceManager;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.util.InvalidXMLException;
import org.xml.sax.SAXException;
/**
* Build a Collection Processing Engine from a {@link CollectionReaderDescription} and a
* {@link AnalysisEngineDescription}. If an aggregate analysis engine description is used, the
* builder will add each child of the aggregate engine as one processor to the engine. This works
* only for aggregate analysis engines using a {@link FixedFlow}.
*
*/
public class CpeBuilder {
// private final Log log = LogFactory.getLog(getClass());
private static final String ACTION_ON_MAX_ERROR = "terminate";
/**
* used for calculating the CAS pool size which needs to be adjusted to the number of parallel
* pipelines
*/
private int maxProcessingUnitThreadCount = 1;
private final CpeDescription cpeDesc = produceDescriptor();
/**
* Set the maximum number of threads the CPE may use.
*
* @param aMaxProcessingUnitThreadCount
* the maximum number of threads the CPE may use.
*/
public void setMaxProcessingUnitThreadCount(int aMaxProcessingUnitThreadCount) {
maxProcessingUnitThreadCount = aMaxProcessingUnitThreadCount;
}
/**
* Set the reader used by the CPE.
*
* @param aDesc
* the reader description.
* @throws IOException
* if the descriptor cannot be written to the temporary space from where it is imported
* by the CPE.
* @throws SAXException
* if the reader descriptor cannot be serialized to XML.
* @throws CpeDescriptorException
* if there was a problem adding the reader to the CPE.
*/
public void setReader(CollectionReaderDescription aDesc) throws IOException, SAXException,
CpeDescriptorException {
// Remove all collection readers
cpeDesc.setAllCollectionCollectionReaders(new CpeCollectionReader[0]);
URL descUrl = materializeDescriptor(aDesc).toURI().toURL();
CpeCollectionReader reader = produceCollectionReader(descUrl.toString());
cpeDesc.addCollectionReader(reader);
}
/**
* Set the analysis engine used by the CPE. If this is an aggregate analysis engine, it will be
* disassembled and each delegate becomes one CPE CAS processor.
*
* @param aDesc
* the analysis engine description.
* @throws IOException
* if the descriptor cannot be written to the temporary space from where it is imported
* by the CPE.
* @throws SAXException
* if the analysis engine descriptor cannot be serialized to XML.
* @throws CpeDescriptorException
* if there was a problem adding the analysis engine to the CPE.
* @throws InvalidXMLException
* if import resolution failed
*/
public void setAnalysisEngine(AnalysisEngineDescription aDesc) throws IOException, SAXException,
CpeDescriptorException, InvalidXMLException {
// Remove all CAS processors
cpeDesc.setCpeCasProcessors(null);
if (aDesc.isPrimitive()) {
// For a primitive AE we just add it.
CpeIntegratedCasProcessor proc = createProcessor("delegate", aDesc);
cpeDesc.addCasProcessor(proc);
} else {
// For an aggregate AE we dive into the first aggregation level and add each of the
// contained AEs separately, thus allowing us to control their properties separately
Map<String, ResourceSpecifier> delegates = aDesc.getDelegateAnalysisEngineSpecifiers();
FixedFlow flow = (FixedFlow) aDesc.getAnalysisEngineMetaData().getFlowConstraints();
for (String key : flow.getFixedFlow()) {
AnalysisEngineDescription aeDesc = (AnalysisEngineDescription) delegates.get(key);
// boolean multi = aeDesc.getAnalysisEngineMetaData().getOperationalProperties()
// .isMultipleDeploymentAllowed();
// log.info("["+key+"] runs "+ (multi ? "multi-threaded" : "single-threaded"));
CpeIntegratedCasProcessor proc = createProcessor(key, aeDesc);
cpeDesc.addCasProcessor(proc);
}
}
}
public CpeDescription getCpeDescription() {
return cpeDesc;
}
public CollectionProcessingEngine createCpe(StatusCallbackListener aListener)
throws ResourceInitializationException, CpeDescriptorException {
ResourceManager resMgr = ResourceManagerFactory.newResourceManager();
if (maxProcessingUnitThreadCount == 0) {
cpeDesc.getCpeCasProcessors().setPoolSize(3);
} else {
cpeDesc.getCpeCasProcessors().setPoolSize(maxProcessingUnitThreadCount + 2);
cpeDesc.setProcessingUnitThreadCount(maxProcessingUnitThreadCount);
}
CollectionProcessingEngine cpe = produceCollectionProcessingEngine(cpeDesc, resMgr, null);
if (aListener != null) {
cpe.addStatusCallbackListener(aListener);
}
return cpe;
}
/**
* Writes a temporary file containing a XML descriptor of the given resource. Returns the file.
*
* @param resource
* A resource specifier that should we materialized.
* @return The file containing the XML representation of the given resource.
*/
private static File materializeDescriptor(ResourceSpecifier resource) throws IOException,
SAXException {
File tempDesc = File.createTempFile("desc", ".xml");
tempDesc.deleteOnExit();
try (OutputStream os = new FileOutputStream(tempDesc)) {
resource.toXML(os);
}
return tempDesc;
}
private static CpeIntegratedCasProcessor createProcessor(String key,
AnalysisEngineDescription aDesc) throws IOException, SAXException, CpeDescriptorException {
URL descUrl = materializeDescriptor(aDesc).toURI().toURL();
CpeInclude cpeInclude = getResourceSpecifierFactory().createInclude();
cpeInclude.set(descUrl.toString());
CpeComponentDescriptor ccd = getResourceSpecifierFactory().createDescriptor();
ccd.setInclude(cpeInclude);
CpeIntegratedCasProcessor proc = produceCasProcessor(key);
proc.setCpeComponentDescriptor(ccd);
proc.setAttributeValue(CpeDefaultValues.PROCESSING_UNIT_THREAD_COUNT, 1);
proc.setActionOnMaxError(ACTION_ON_MAX_ERROR);
proc.setMaxErrorCount(0);
return proc;
}
}