blob: 7fb241f354665337924b022a199d7baaa3c727c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.adapter.jms.activemq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
//import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageWrapper;
import org.springframework.jms.listener.SessionAwareMessageListener;
/**
* This class main job is to receive a JMS message from Spring Listeners
* via onMessage() and wrapping it in a MessageWrapper before enqueuing that
* onto a BlockingPriorityQueue for process threads. Only two listener
* types send messages to this class: Targeted Listener and Process Listener.
* For targeted listener this class creates a dedicated semaphore with 1
* permit. For process listener this class creates a dedicated semaphore
* with a number of permits equal to scaleout (# of process threads).
* The listener thread calling onMessage() will try to acquire its
* semaphore and will block if no permits are available. The targeted listener
* is identified by value returned from message.getJMSType(). This is set
* in UimaDefaultMessageListenerContainer.doInvokeListener() before this
* class onMessage() is called.
*
*/
@SuppressWarnings("rawtypes")
public class PriorityMessageHandler implements SessionAwareMessageListener {
private PriorityBlockingQueue<MessageWrapper> queue =
new PriorityBlockingQueue<MessageWrapper>();
private Semaphore targetedListenerSemaphore = null;
private Semaphore processListenerSemaphore = null;
public PriorityMessageHandler(int scaleout) {
processListenerSemaphore =
new Semaphore(scaleout);
targetedListenerSemaphore =
new Semaphore(1);
}
public BlockingQueue<MessageWrapper> getQueue() {
return queue;
}
/**
* Wraps an incoming message in a protocol neutral envelope and adds priority
* value to enable sorting in a Priority Queue. Messages specifically targetting
* an instance of a service will be assigned high priority for processing.
*/
public void onMessage(Message message, Session session) throws JMSException {
Semaphore semaphore = null;
// System.out.println("................ PriorityMessageHandler.onMessage() - Thread ID:"+Thread.currentThread().getId());
// the JMSType is set by targeted listener in
// UimaDefaultMessageListenerContainer.doInvokeListener(). The process
// listener does not set this property.
if ( "TargetMessage".equals(message.getJMSType()) ) {
semaphore = targetedListenerSemaphore; // 1 permit
} else {
semaphore = processListenerSemaphore; // N permits
}
try {
// throttle the listener so that it doesn't enqueue more work
// than necessary.
semaphore.acquire();
} catch( InterruptedException e) {
System.out.println("Semaphore Interrupted ");
}
// wrap JMS message and add a semaphore so that the process thread can signal the JMS
// thread.
MessageWrapper m =
new MessageWrapper(message, session, semaphore, message.getJMSPriority());
// Add the message to a shared queue. This queue is shared with a custom thread
// factory where messages are consumed. If a consumer thread is not available
// the add() method will block to support throttling of incoming messages
queue.add(m);
}
}