blob: a5089f69215cb3e9b469b274a76abea41dbba248 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.adapter.jms.activemq;
import java.io.InvalidClassException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaAsThreadFactory;
import org.apache.uima.aae.UimaBlockingExecutor;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.util.Level;
import org.springframework.jms.listener.SessionAwareMessageListener;
/**
* Message listener injected at runtime into Aggregate to handle a race condition when multiple
* threads simultaneously process messages from a Cas Multiplier. It is only used to process
* messages from a Cas Multiplier and only if the reply queue has more than one consumer thread
* configured in a deployment descriptor. The listener creates a pool of threads equal to the number
* of concurrent consumers defined in the DD for the listener on the reply queue. Once the message
* is handled in onMessage(), it is than delegated for processing to one of the available threads
* from the pool.
*
* This listener guarantees processing order. It receives messages from Spring in a single thread
* and if it finds a child CAS in the message, it increments the parent (input) CAS child count and
* delegates processing to the InputChannel instance.
*
* The race condition: The Cas Multiplier sends the last child and the parent almost at the same
* time. Both are received by the aggregate and are processed in different threads, if a scaleout is
* used on the reply queue. One thread may start processing the input CAS while the other thread
* (with the last child) is not yet allowed to run. The first thread takes the input CAS all the way
* to the final step and since at this time, the input CAS has no children ( the thread processing
* the last child has not updated the child count yet), it will be prematurely released. When the
* thread with the last child is allowed to run, it finds that the parent no longer exists in the
* cache.
*
*
*/
public class ConcurrentMessageListener implements SessionAwareMessageListener {
private static final Class CLASS_NAME = ConcurrentMessageListener.class;
private SessionAwareMessageListener delegateListener;
private int concurrentThreadCount = 0;
private AnalysisEngineController controller;
private ThreadPoolExecutor executor = null;
private UimaBlockingExecutor blockingExecutor;
private LinkedBlockingQueue<Runnable> workQueue;
private CountDownLatch controllerLatch = new CountDownLatch(1);
/**
* Creates a listener with a given number of process threads. This listener is injected between
* Spring and JmsInputChannel to enable orderly processing of CASes. This listener is only used on
* reply queues that have scale out attribute in DD greater than 1. Its main job is to increment
* number of child CASes for a given input CAS. It does so in a single thread, and once it
* completes the update this listener submits the CAS for further processing up to the
* JmsInputChannel. The CAS is submitted to a queue where the executor assigns a free thread to
* process the CAS.
*
* @param concurrentThreads
* - number of threads to use to process CASes
* @param delegateListener
* - JmsInputChannel instance to delegate CAS to
* @throws InvalidClassException
*/
public ConcurrentMessageListener(int concurrentThreads, Object delegateListener, String destination, ThreadGroup threadGroup, String threadPrefix)
throws InvalidClassException {
if (!(delegateListener instanceof SessionAwareMessageListener)) {
throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"
+ SessionAwareMessageListener.class + " Received:" + delegateListener.getClass());
}
concurrentThreadCount = concurrentThreads;
this.delegateListener = (SessionAwareMessageListener) delegateListener;
if (concurrentThreads > 1) {
// created an unbounded queue. The throttling is controlled by the
// semaphore in the UimaBlockingExecutor initialized below
workQueue = new LinkedBlockingQueue<Runnable>();
executor = new ThreadPoolExecutor(concurrentThreads, concurrentThreads, Long.MAX_VALUE,
TimeUnit.NANOSECONDS, workQueue);
UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
tf.setDaemon(true);
tf.setThreadNamePrefix(threadPrefix);
executor.setThreadFactory(tf);
executor.prestartAllCoreThreads();
if ( destination != null ) {
blockingExecutor = new UimaBlockingExecutor(executor, concurrentThreads, destination);
} else {
blockingExecutor = new UimaBlockingExecutor(executor, concurrentThreads);
}
}
}
public ThreadPoolExecutor getTaskExecutor() {
return executor;
}
public void stop() {
blockingExecutor.stop();
}
public void setAnalysisEngineController(AnalysisEngineController controller) {
this.controller = controller;
controllerLatch.countDown();
}
private boolean isMessageFromCasMultiplier(final Message message) throws JMSException {
return message.propertyExists(AsynchAEMessage.CasSequence);
}
/**
* Intercept a message to increment a child count of the input CAS. This method is always called
* in a single thread, guaranteeing order of processing. The child CAS will always come here
* first. Once the count is updated, or this CAS is not an child, the message will be delegated to
* one of the threads in the pool that will eventually call InputChannel object where the actual
* processing of the message begins.
*
*/
public void onMessage(final Message message, final Session session) throws JMSException {
try {
// Wait until the controller is plugged in
controllerLatch.await();
} catch (InterruptedException e) {
}
if (isMessageFromCasMultiplier(message)) {
// Check if the message came from a Cas Multiplier and it contains a new Process Request
int command = message.getIntProperty(AsynchAEMessage.Command);
int messageType = message.getIntProperty(AsynchAEMessage.MessageType);
// Intercept Cas Process Request from a Cas Multiplier
if (command == AsynchAEMessage.Process && messageType == AsynchAEMessage.Request) {
String msgFrom = (String) message.getStringProperty(AsynchAEMessage.MessageFrom);
if (msgFrom != null && controller instanceof AggregateAnalysisEngineController) {
String delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(msgFrom);
if (delegateKey != null) {
Delegate delegate = ((AggregateAnalysisEngineController) controller)
.lookupDelegate(delegateKey);
delegate.setConcurrentConsumersOnReplyQueue();
}
}
try {
String parentCasReferenceId = message
.getStringProperty(AsynchAEMessage.InputCasReference);
// Fetch parent CAS entry from the local cache
CasStateEntry parentEntry = controller.getLocalCache().lookupEntry(parentCasReferenceId);
// increment number of child CASes this parent has in play
parentEntry.incrementSubordinateCasInPlayCount();
// increment a counter that counts number of child CASes that have no
// flow object yet. The flow object is created for each child CAS from
// the parent flow object. The method below will actually acquire a
// permit from a binary semaphore to force the parent to block until
// the last of its children acquires its Flow object.
parentEntry.incrementOutstandingFlowCounter();
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", controller.getComponentName());
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
}
}
}
if (concurrentThreadCount > 1) {
// Delegate meesage to the JmsInputChannel
try {
blockingExecutor.submitTask(new Runnable() {
public void run() {
try {
delegateListener.onMessage(message, session);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", controller.getComponentName());
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
}
}
});
} catch( InterruptedException e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
} else {
// Just handoff the message to the InputChannel object
delegateListener.onMessage(message, session);
}
}
}