blob: 4ad02c0503a2fc409cfb40b926f3821d4845145a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.user.jp;
import java.io.File;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.uima.UIMAFramework;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.analysis_engine.AnalysisEngineManagement;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.cas.CAS;
import org.apache.uima.ducc.user.common.UimaUtils;
import org.apache.uima.ducc.user.jp.uima.UimaAnalysisEngineInstancePoolWithThreadAffinity;
import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceManager;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.util.CasPool;
import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
public class UimaProcessContainer extends DuccAbstractProcessContainer {
public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
// private DuccUimaSerializer uimaSerializer = new DuccUimaSerializer();
private static ResourceManager rm=null;
Semaphore sharedInitSemaphore = new Semaphore(1);
// this map enforces thread affinity to specific thread. Needed to make
// sure that a thread used to initialized the AE is used to call process().
// Some AEs depend on ThreadLocal storage.
UimaAnalysisEngineInstancePoolWithThreadAffinity instanceMap = new UimaAnalysisEngineInstancePoolWithThreadAffinity();
private static CasPool casPool = null;
AtomicInteger counter = new AtomicInteger();
private String analysisEngineDescriptor=null;
private static CountDownLatch latch = new CountDownLatch(1);
/**
* The platform MBean server if one is available (Java 1.5 only)
*/
private static Object platformMBeanServer;
static {
// try to get platform MBean Server (Java 1.5 only)
try {
Class<?> managementFactory = Class.forName("java.lang.management.ManagementFactory");
Method getPlatformMBeanServer = managementFactory.getMethod("getPlatformMBeanServer",
new Class[0]);
platformMBeanServer = getPlatformMBeanServer.invoke(null, (Object[]) null);
} catch (Exception e) {
platformMBeanServer = null;
}
}
// maintain thread affinity to specific instance of AE
private volatile boolean threadAffinity=true;
/*
private String gen(int length) {
StringBuffer sb = new StringBuffer();
for(int i=length; i > 0; i -= 12) {
int n = Math.min(12, Math.abs(i));
sb.append(org.apache.commons.lang.StringUtils.leftPad(Long.toString(Math.round(Math.random()*Math.pow(36,n)),n),'0'));
}
return sb.toString();
}
*/
public boolean useThreadAffinity() {
return threadAffinity;
}
// UIMA-5428 Generate the pieces-parts AE only if the JD-generated one is not accessible
private int configureAndGetScaleout(String[] args ) throws Exception {
analysisEngineDescriptor = ArgsParser.getArg("-aed", args);
if (analysisEngineDescriptor == null || !new File(analysisEngineDescriptor).canRead()) {
analysisEngineDescriptor = buildDeployable();
System.out.println("Created descriptor: "+analysisEngineDescriptor);
}
String threadCount = ArgsParser.getArg("-t", args); // Will be null if ducc.deploy.JpThreadCount is undefined
scaleout = threadCount==null ? 1 : Integer.valueOf(threadCount); // Default to 1
return scaleout;
}
public byte[] getLastSerializedError() throws Exception {
if (lastError != null) {
return serialize(lastError);
}
return null;
}
public int doInitialize(Properties props, String[] args) throws Exception {
return configureAndGetScaleout(args);
}
public void doDeploy() throws Exception {
System.out.println("....... UimaProcessContainer.doDeploy()");
ResourceSpecifier rSpecifier = null;
HashMap<String,Object> paramsMap =
new HashMap<String,Object>();
synchronized(UimaProcessContainer.class) {
if ( rm == null ) {
rm = UIMAFramework.newDefaultResourceManager();
}
}
paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, rm);
paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, platformMBeanServer);
try {
// Acquire single-permit semaphore to serialize instantiation of
// AEs. This is done to control access to non-thread safe
// structures in the core. The sharedInitSemaphore is a static
// and is shared by all instances of this class.
sharedInitSemaphore.acquire();
// Parse the descriptor in the calling thread.
rSpecifier = UimaUtils.getResourceSpecifier(analysisEngineDescriptor);
AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier,
paramsMap);
instanceMap.checkin(ae);
if (instanceMap.size() == scaleout) {
try {
Properties props = new Properties();
props.setProperty(UIMAFramework.CAS_INITIAL_HEAP_SIZE, "1000");
analysisEngineMetadata = ae.getAnalysisEngineMetaData();
casPool = new CasPool(scaleout, analysisEngineMetadata,rm);
latch.countDown();
} catch (Exception e) {
throw new ResourceInitializationException(e);
}
}
} catch (Throwable e) {
latch.countDown();
Logger logger = UIMAFramework.getLogger();
logger.log(Level.WARNING, "UimaProcessContainer", e);
e.printStackTrace();
throw new RuntimeException(e);
} finally {
sharedInitSemaphore.release();
}
}
public void doStop() throws Exception {
try {
AnalysisEngine ae = instanceMap.checkout();
if ( ae != null ) {
ae.destroy();
}
} catch( Exception e) {
e.printStackTrace();
} finally {
}
}
public List<Properties> doProcess(Object xmi) throws Exception {
AnalysisEngine ae = null;
latch.await();
CAS cas = casPool.getCas();
try {
// reset last error
lastError = null;
// deserialize the CAS
getUimaSerializer().deserializeCasFromXmi((String)xmi, cas);
// the following checks out AE instance pinned to this thread
ae = instanceMap.checkout();
List<AnalysisEnginePerformanceMetrics> beforeAnalysis = getMetrics(ae);
ae.process(cas);
List<AnalysisEnginePerformanceMetrics> afterAnalysis = getMetrics(ae);
// get the delta
List<AnalysisEnginePerformanceMetrics> casMetrics = getAEMetricsForCAS(
afterAnalysis, beforeAnalysis);
// convert UIMA-AS metrics into properties so that we can return
// this
// data in a format which doesnt require UIMA-AS to digest
List<Properties> metricsList = new ArrayList<Properties>();
for (AnalysisEnginePerformanceMetrics metrics : casMetrics) {
Properties p = new Properties();
p.setProperty("name", metrics.getName());
p.setProperty("uniqueName", metrics.getUniqueName());
p.setProperty("analysisTime",
String.valueOf(metrics.getAnalysisTime()));
p.setProperty("numProcessed",
String.valueOf(metrics.getNumProcessed()));
metricsList.add(p);
}
return metricsList;
} catch( Throwable e ) {
lastError = e;
Logger logger = UIMAFramework.getLogger();
logger.log(Level.WARNING, "UimaProcessContainer", e);
e.printStackTrace();
throw new AnalysisEngineProcessException();
}
finally {
if (ae != null) {
instanceMap.checkin(ae);
}
if (cas != null) {
casPool.releaseCas(cas);
}
}
}
private List<AnalysisEnginePerformanceMetrics> getMetrics(AnalysisEngine ae)
throws Exception {
List<AnalysisEnginePerformanceMetrics> analysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
synchronized(UimaProcessContainer.class) {
// Fetch AE's management information that includes per component
// performance stats
// These stats are internally maintained in a Map. If the AE is an
// aggregate
// the Map will contain AnalysisEngineManagement instance for each AE.
AnalysisEngineManagement aem = ae.getManagementInterface();
if (aem.getComponents().size() > 0) {
// Flatten the hierarchy by recursively (if this AE is an aggregate)
// extracting
// primitive AE's AnalysisEngineManagement instance and placing it
// in
// afterAnalysisManagementObjects List.
getLeafManagementObjects(aem, analysisManagementObjects);
// System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
// System.out.println("-----------------Simple1:"+aem.getName());
} else {
String path = produceUniqueName(aem);
// System.out.println(Thread.currentThread().getId()+" -----------------Unique2:"+aem.getUniqueMBeanName());
// System.out.println(Thread.currentThread().getId()+" -----------------Simple2:"+aem.getName());
// System.out.println(Thread.currentThread().getId()+" -----------------Path:"+path);
analysisManagementObjects.add(deepCopyMetrics(aem, path));
}
}
return analysisManagementObjects;
}
private void getLeafManagementObjects(AnalysisEngineManagement aem,
List<AnalysisEnginePerformanceMetrics> result) {
getLeafManagementObjects(aem, result, "");
}
/**
* Recursively
*
* @param aem
* @param result
* @param uimaFullyQualifiedAEContext
*/
private void getLeafManagementObjects(AnalysisEngineManagement aem,
List<AnalysisEnginePerformanceMetrics> result,
String uimaFullyQualifiedAEContext) {
// System.out.println("----------- 1 getLeafManagementObjects() - Unique Name:"+aem.getUniqueMBeanName()+" UniqueContext:"+uimaFullyQualifiedAEContext);
if (aem.getComponents().isEmpty()) {
// skip Flow Controller
if (!aem.getName().equals("Fixed Flow Controller")) {
// is this primitive AE delegate in an aggregate. If so the
// mbean unique name will have "p0=" string. An examples mbean
// name looks like this:
// org.apache.uima:type=ee.jms.services,s=Top Level Aggregate
// TAE Uima EE Service,p0=Top Level Aggregate TAE
// Components,p1=SecondLevelAggregateCM
// Components,p2=ThirdLevelAggregateCM
// Components,name=Multiplier1
if (aem.getUniqueMBeanName().indexOf("p0=") > -1) {
int p1indx = aem.getUniqueMBeanName().indexOf("p1=");
if ( p1indx > -1 ) {
String tmp = aem.getUniqueMBeanName().substring(p1indx);
String[] parts = tmp.split(",");
for( String part : parts ) {
if ( part.startsWith("name=") ) {
uimaFullyQualifiedAEContext += "/"+part.substring(5);
break;
}
}
} else {
uimaFullyQualifiedAEContext = "";
}
}
result.add(deepCopyMetrics(aem, uimaFullyQualifiedAEContext));
}
} else {
for (AnalysisEngineManagement child : (Iterable<AnalysisEngineManagement>) aem
.getComponents().values()) {
getLeafManagementObjects(child, result, produceUniqueName(aem));
}
}
}
private String produceUniqueName(AnalysisEngineManagement aem) {
String[] parts = aem.getUniqueMBeanName().split(",");
StringBuffer sb = new StringBuffer();
for (String part : parts) {
int pos;
if ((pos = part.indexOf("=")) > -1 && part.startsWith("p")) {
String n = part.substring(pos + 1, part.indexOf(" Components"));
if (part.startsWith("p0=") && n.indexOf(" ") > -1) {
String indx = n.substring(n.lastIndexOf(" "));
if (indx != null) {
int instanceNumber = -1;
try {
instanceNumber = Integer.parseInt(indx.trim());
sb.append(instanceNumber).append(" Components ");
n = n.substring(0, n.lastIndexOf(" "));
} catch (NumberFormatException nfe) {
}
}
}
sb.append("/").append(n.trim());
} else if (part.trim().startsWith("name=") || part.trim().startsWith("org.apache.uima:name=")) {
sb.append("/").append(
part.substring(part.trim().indexOf("=") + 1));
}
}
return sb.toString();
}
private AnalysisEnginePerformanceMetrics deepCopyMetrics(
AnalysisEngineManagement aem, String uimaFullyQualifiedAEContext) {
String index = "";
// Create a unique name with each AE name is separated with "/". Prepend
// "X Components" where
// X is a instance number of a scaled AE. Also, strip the X from the AE
// name. The instance number
// is added to each scaled up component during initialization of the
// uima-as. We need to prepend
// "X Components" to allow DUCC JD to parse the unique name correctly (
// basically for backwards
// compatibility.
int pos = aem.getUniqueMBeanName().lastIndexOf("name=");
if (pos > -1) {
// get the name of the component. In case of nested component this
// will be the KEY from AE descriptor
String tmp = aem.getUniqueMBeanName().substring(pos + 5);
// in case this is the top level AE, check if it has been scaled up
// by extracting its instance number.For example,
// NoOpAnnotator 2.
int last = tmp.lastIndexOf(" ");
if ( last == -1 ) {
index = "1";
} else {
index = tmp.substring(last).trim();
}
// System.out.println("uimaFullyQualifiedAEContext.trim().length()="+uimaFullyQualifiedAEContext.trim().length() );
if (uimaFullyQualifiedAEContext.trim().length() > 0 && last > -1) {
// extract instance number
try {
// check if the instance number is a number. If not silently
// handle the exception.
Integer.parseInt(index);
// System.out.println("deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
// strip the instance number from the AE name
uimaFullyQualifiedAEContext = uimaFullyQualifiedAEContext
.substring(0, last + 1);
} catch (NumberFormatException nfe) {
} catch( Exception e) {
// System.out.println(Thread.currentThread().getId()+" deepCopyMetrics - context:"+uimaFullyQualifiedAEContext+" last="+last);
}
} else {
if (!uimaFullyQualifiedAEContext.endsWith(tmp)) {
uimaFullyQualifiedAEContext += "/" + tmp;
}
}
}
// Primitive AE will not have "X Components" prefix, but it is required
// by the DUCC JD to be there. Prepend it to the unique name.
/*
if (uimaFullyQualifiedAEContext.indexOf(" Components ") == -1) {
uimaFullyQualifiedAEContext = index + " Components "
+ uimaFullyQualifiedAEContext;
}
*/
return new AnalysisEnginePerformanceMetrics(aem.getName(),
uimaFullyQualifiedAEContext, aem.getAnalysisTime(),
aem.getNumberOfCASesProcessed());
}
private List<AnalysisEnginePerformanceMetrics> getAEMetricsForCAS(
List<AnalysisEnginePerformanceMetrics> afterAnalysisManagementObjects,
List<AnalysisEnginePerformanceMetrics> beforeAnalysisManagementObjects)
throws Exception {
// Create a List to hold per CAS analysisTime and total number of CASes processed by each AE.
// This list will be serialized and sent to the client
List<AnalysisEnginePerformanceMetrics> performanceList = new ArrayList<AnalysisEnginePerformanceMetrics>();
// Diff the before process() performance metrics with post process performance metrics
for (AnalysisEnginePerformanceMetrics after : afterAnalysisManagementObjects) {
for (AnalysisEnginePerformanceMetrics before : beforeAnalysisManagementObjects) {
String uniqueName = after.getUniqueName();
if (before.getUniqueName().equals(after.getUniqueName())) {
if ( (after.getAnalysisTime() - before.getAnalysisTime()) < 0 ) {
Logger logger = UIMAFramework.getLogger();
logger.log(Level.WARNING, "Thread:"+Thread.currentThread()+" UimaProcessContainer.getAEMetricsForCAS() - Unexpected negative result for analysis time:"+(after.analysisTime-before.analysisTime)+" Component:"+uniqueName+" before="+before.analysisTime+" after="+after.analysisTime);
}
AnalysisEnginePerformanceMetrics metrics = new AnalysisEnginePerformanceMetrics(
after.getName(), uniqueName,
after.getAnalysisTime() - before.getAnalysisTime(),
after.getNumProcessed());
performanceList.add(metrics);
break;
}
}
}
return performanceList;
}
private static class AnalysisEnginePerformanceMetrics {
private String name;
private String uniqueName;
private long analysisTime;
private long numProcessed;
/**
* Creates a performance metrics instance
*
*/
public AnalysisEnginePerformanceMetrics(String name, String uimaContextPath, long analysisTime, long numProcessed ) {
this.name = name;
this.uniqueName = uimaContextPath;
this.analysisTime = analysisTime;
this.numProcessed = numProcessed;
}
/**
* Gets the local name of the component as specified in the aggregate
*
* @return the name
*/
public String getName() {
return name;
}
/**
* Gets the unique name of the component reflecting its location in the aggregate hierarchy
*
* @return the unique name
*/
public String getUniqueName() {
if ( uniqueName != null && uniqueName.trim().length() > 0 && !uniqueName.trim().equals("Components")) {
// if ( !uimaContextPath.endsWith(getName())) {
// return uimaContextPath+"/"+getName();
// }
return uniqueName;
} else {
return getName();
}
}
/**
* Gets the elapsed time the CAS spent analyzing this component
*
* @return time in milliseconds
*/
public long getAnalysisTime() {
return analysisTime;
}
/**
* Gets the total number of CASes processed by this component so far
*
* @return number processed
*/
public long getNumProcessed() {
return numProcessed;
}
}
}