blob: dcab473ba592a92acc2579c650c85435e66c6c61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.aae;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
import org.apache.uima.util.Level;
/**
* Custom ThreadFactory for use in the TaskExecutor. The TaskExecutor is plugged in by Spring from
* spring xml file generated by dd2spring. The TaskExecutor is only defined for PrimitiveControllers
* and its main purpose is to provide thread pooling and management. Each new thread produced by
* this ThreadFactory is used to initialize a dedicated AE instance in the PrimitiveController.
*
*
*/
public class UimaAsThreadFactory implements ThreadFactory {
private static final Class<UimaAsThreadFactory> CLASS_NAME = UimaAsThreadFactory.class;
private static final String THREAD_POOL = "[UIMA AS ThreadPool ";
private PrimitiveAnalysisEngineController controller;
private ThreadGroup theThreadGroup;
private String threadNamePrefix=null;
private boolean isDaemon=false;
public static AtomicInteger poolIdGenerator = new AtomicInteger();
private final int poolId = poolIdGenerator.incrementAndGet();
private CountDownLatch latchToCountNumberOfTerminatedThreads;
private volatile boolean initFailed=false;
public UimaAsThreadFactory(ThreadGroup tGroup) {
this(tGroup,null);
}
/**
*
*
* @param tGroup
* @param aController
*/
public UimaAsThreadFactory(ThreadGroup tGroup, PrimitiveAnalysisEngineController aController) {
this( tGroup, aController, null);
}
public UimaAsThreadFactory(ThreadGroup tGroup, PrimitiveAnalysisEngineController aController, CountDownLatch latchToCountNumberOfTerminatedThreads) {
controller = aController;
theThreadGroup = tGroup;
this.latchToCountNumberOfTerminatedThreads = latchToCountNumberOfTerminatedThreads;
}
public void setThreadNamePrefix(String prefix) {
threadNamePrefix = prefix;
}
public void setThreadGroup( ThreadGroup tGroup) {
theThreadGroup = tGroup;
}
public void setDaemon(boolean daemon) {
// isDaemon = daemon;
}
public void stop() {
}
/**
* Creates a new thread, initializes instance of AE via a call on a given PrimitiveController.
* Once the thread finishes initializing AE instance in the controller, it calls run() on a given
* Runnable. This Runnable is a Worker instance managed by the TaskExecutor. When the thread calls
* run() on the Runnable it blocks until the Worker releases it.
*/
public Thread newThread(final Runnable r) {
Thread newThread = null;
try {
newThread = new Thread(theThreadGroup, new Runnable() {
public void run() {
if ( threadNamePrefix == null ) {
if ( controller != null ) {
threadNamePrefix = THREAD_POOL+poolId+"] "+controller.getComponentName() + " Process Thread";
} else {
threadNamePrefix = THREAD_POOL+poolId+"] ";
}
}
Thread.currentThread().setName( threadNamePrefix +" - "
+ Thread.currentThread().getId());
try {
if (controller != null && !controller.threadAssignedToAE()) {
// call the controller to initialize next instance of AE. Once initialized this
// AE instance process() method will only be called from this thread
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
"UimaAsThreadFactory.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_calling_ae_initialize__INFO", new Object[] {controller.getComponentName(),Thread.currentThread().getId()});
if ( !initFailed && !controller.getState().equals(ServiceState.FAILED) ) {
try {
controller.initializeAnalysisEngine();
} catch( Exception e) {
initFailed = true;
e.printStackTrace();
throw e;
}
} else {
return; // there was failure previously so just return
}
}
// Call given Worker (Runnable) run() method and block. This call block until the
// TaskExecutor is terminated.
r.run();
} catch (Throwable e) {
if ( !(e instanceof Exception) ) {
// try to log. If this is OOM, logging may not succeed and we
// get another OOM.
try {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"UimaAsThreadFactory", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
System.out.println(">>>>>>>>>>>>>>>>>>Exiting UIMA AS Process Due to Java Error "+e);
} catch( Throwable t ) {
// Failed during logging. We are tight on memory. Just exit
}
System.exit(-1);
}
return;
} finally {
if ( controller instanceof PrimitiveAnalysisEngineController_impl ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
"UimaAsThreadFactory.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_process_thread_exiting__INFO", new Object[] {controller.getComponentName(),Thread.currentThread().getId()});
((PrimitiveAnalysisEngineController_impl)controller).destroyAE();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
"UimaAsThreadFactory.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_ae_instance_destroy_called__INFO", new Object[] {controller.getComponentName(),Thread.currentThread().getId()});
if ( latchToCountNumberOfTerminatedThreads != null ) {
synchronized( latchToCountNumberOfTerminatedThreads ) {
latchToCountNumberOfTerminatedThreads.countDown();
}
}
}
}
}
});
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
if ( controller != null ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"UimaAsThreadFactory", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", controller.getComponentName());
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"UimaAsThreadFactory", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
if ( newThread != null ) {
newThread.setDaemon(isDaemon);
}
return newThread;
}
}