blob: 062bbf09cb90310ba01a669827acd7e93e46cdb1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.as.client;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.uima.aae.UimaAsThreadFactory;
import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.definition.connectors.ListenerCallback;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.spi.transport.vm.UimaVmQueue;
import org.springframework.core.task.TaskExecutor;
public class DirectListener implements Listener, JavaQueueListener {
private BlockingQueue<DirectMessage> inQueue;
private DirectInputChannel ic;
private int consumerCount = 1;
private ExecutorService executor;
private AnalysisEngineController controller;
private int scaleout;
private Thread msgHandlerThread;
private BlockingQueue<Runnable> workQueue = null;
private CountDownLatch latchToCountNumberOfInitedThreads;
private CountDownLatch latchToCountNumberOfTerminatedThreads;
private Type type;
private Transport transport = Transport.Java;
//private boolean hasTaskExecutor = false;
private String name=null;
private volatile boolean started = false;
public DirectListener(Type t) {
this.type = t;
System.out.println("DirectListener c'tor - type:"+type.name());
}
public DirectListener(AnalysisEngineController ctlr, Type t, int scaleout) {
this.controller = ctlr;
this.scaleout = scaleout;
this.type = t;
System.out.println("DirectListener c'tor - type:"+type.name());
}
public Transport getTransport() {
return transport;
}
public BlockingQueue<DirectMessage> getEndpoint() {
return inQueue;
}
public Type getType() {
return type;
}
public void setController(AnalysisEngineController ctlr) {
this.controller = ctlr;
}
public DirectListener withController(AnalysisEngineController ctlr) {
this.controller = ctlr;
return this;
}
public void setConsumerThreads(int howMany) {
this.scaleout = howMany;
latchToCountNumberOfInitedThreads = new CountDownLatch(howMany);
latchToCountNumberOfTerminatedThreads = new CountDownLatch(howMany);
}
public DirectListener withConsumerThreads(int howMany) {
this.scaleout = howMany;
latchToCountNumberOfInitedThreads = new CountDownLatch(howMany);
latchToCountNumberOfTerminatedThreads = new CountDownLatch(howMany);
return this;
}
public DirectListener withName(String name) {
this.name = name;
return this;
}
public void setQueue(BlockingQueue<DirectMessage> inQueue) throws Exception {
this.inQueue = inQueue;
}
public DirectListener withQueue(BlockingQueue<DirectMessage> inQueue) throws Exception {
this.inQueue = inQueue;
return this;
}
public void setConsumers(int howMany) {
consumerCount = howMany;
}
public DirectListener withInputChannel(DirectInputChannel ic) {
this.ic = ic;
return this;
}
public void setInputChannel(DirectInputChannel ic) {
this.ic = ic;
}
public DirectListener initialize() throws Exception {
if (Type.ProcessCAS.equals(type)) {
workQueue = new UimaVmQueue();
if ( controller instanceof PrimitiveAnalysisEngineController ) {
ThreadGroup threadGroup = new ThreadGroup("VmThreadGroup" + 1 + "_" + controller.getComponentName());
executor = new ThreadPoolExecutor(scaleout, scaleout, Long.MAX_VALUE, TimeUnit.DAYS, workQueue);
UimaAsThreadFactory tf = null;
DirectListenerCallback callback = new DirectListenerCallback(this);
tf = new UimaAsThreadFactory().
withCallback(callback).
withThreadGroup(threadGroup).
withPrimitiveController((PrimitiveAnalysisEngineController)controller).
withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
tf.setDaemon(true);
((ThreadPoolExecutor)executor).setThreadFactory(tf);
((ThreadPoolExecutor)executor).prestartAllCoreThreads();
latchToCountNumberOfInitedThreads.await();
if ( callback.failedInitialization() ) {
throw callback.getException();
}
System.out.println("Executor Started - All Process Threads Initialized");
} else {
executor = Executors.newFixedThreadPool(consumerCount);
}
} else {
executor = Executors.newFixedThreadPool(consumerCount);
}
return this;
}
private boolean stopConsumingMessages(DirectMessage message ) {
return message.getAsInt(AsynchAEMessage.Command) == AsynchAEMessage.PoisonPill;
}
public synchronized void start() {
if ( started ) {
return;
}
System.out.println(">>> "+controller.getComponentName()+" DirectListener.start()");
msgHandlerThread = new Thread() {
public void run() {
started = true;
boolean stop = false;
while( !controller.isStopped() && !stop) {
try {
final DirectMessage message = inQueue.take(); //blocks if empty
if ( stopConsumingMessages(message)) {
System.out.println(">>> "+controller.getComponentName()+" Got END message - Stopping Queue Consumer");
stop = true;
} else {
executor.submit(new Runnable() {
public void run() {
try {
System.out.println(">>> "+controller.getComponentName()+" Got new message - processing on thread "+Thread.currentThread().getName()+" channel:"+getType());
ic.onMessage(message);
} catch( Exception e) {
e.printStackTrace();
}
}
});
}
} catch( InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(getType()+ " Listener Thread Interrupted - Stop Listening");
stop = true;
} catch (Exception e) {
e.printStackTrace();
stop = true;
}
}
}
};
msgHandlerThread.start();
}
public void stop() {
msgHandlerThread.interrupt();
DirectMessage message = new DirectMessage().withCommand(AsynchAEMessage.PoisonPill);
try {
inQueue.put(message);
} catch( InterruptedException e) {
Thread.currentThread().interrupt();
}
if ( executor != null ) {
executor.shutdown();
}
}
public TaskExecutor getTaskExecutor() {
return null;
}
public String getName() {
if ( name != null ) {
return name;
}
return controller.getKey();
}
public class DirectListenerCallback implements ListenerCallback{
private DirectListener dl;
private boolean initializationFailed = false;
private Exception exception;
public DirectListenerCallback(DirectListener l) {
this.dl = l;
}
public void onInitializationError(Exception e) {
initializationFailed = true;
exception = e;
}
public boolean failedInitialization() {
return initializationFailed;
}
public Exception getException() {
return exception;
}
}
}