| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.uima.adapter.jms.activemq; |
| |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Semaphore; |
| |
| import org.apache.activemq.ActiveMQConnectionFactory; |
| import org.apache.activemq.ActiveMQPrefetchPolicy; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.uima.UIMAFramework; |
| import org.apache.uima.aae.UIDGenerator; |
| import org.apache.uima.aae.controller.AggregateAnalysisEngineController; |
| import org.apache.uima.aae.controller.AnalysisEngineController; |
| import org.apache.uima.aae.controller.Controller; |
| import org.apache.uima.aae.controller.ControllerCallbackListener; |
| import org.apache.uima.aae.controller.ControllerLifecycle; |
| import org.apache.uima.aae.controller.Endpoint; |
| import org.apache.uima.aae.controller.UimacppServiceController; |
| import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState; |
| import org.apache.uima.adapter.jms.JmsConstants; |
| import org.apache.uima.resource.ResourceInitializationException; |
| import org.apache.uima.util.Level; |
| import org.springframework.context.ApplicationContext; |
| import org.springframework.context.support.FileSystemXmlApplicationContext; |
| import org.springframework.jms.support.destination.DestinationResolver; |
| |
| public class SpringContainerDeployer implements ControllerCallbackListener { |
| private static final Class CLASS_NAME = SpringContainerDeployer.class; |
| |
| private static final int MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q = 10; |
| |
| public static final int QUIESCE_AND_STOP = 1000; |
| |
| public static final int STOP_NOW = 1001; |
| |
| private volatile boolean serviceInitializationCompleted; |
| |
| private volatile boolean serviceInitializationException; |
| |
| private Semaphore serviceInitializationSemaphore = new Semaphore(1); |
| |
| private ConcurrentHashMap springContainerRegistry = null; |
| |
| private FileSystemXmlApplicationContext context = null; |
| |
| private Object mux = new Object(); |
| |
| private AnalysisEngineController topLevelController = null; |
| |
| public SpringContainerDeployer() { |
| } |
| |
| public SpringContainerDeployer(ConcurrentHashMap aSpringContainerRegistry) { |
| springContainerRegistry = aSpringContainerRegistry; |
| } |
| |
| private UimaDefaultMessageListenerContainer produceListenerConnector(ActiveMQConnectionFactory cf) { |
| DestinationResolver resolver = new TempDestinationResolver(); |
| UimaDefaultMessageListenerContainer connector = new UimaDefaultMessageListenerContainer(true); |
| connector.setConnectionFactory(cf); |
| connector.setConcurrentConsumers(1); |
| connector.setDestinationResolver(resolver); |
| |
| connector.initializeContainer(); |
| synchronized (mux) { |
| while (connector.getListenerEndpoint() == null) { |
| try { |
| mux.wait(50); |
| } catch (InterruptedException e) { |
| } |
| } |
| |
| } |
| // connector.afterPropertiesSet(); |
| connector.start(); |
| return connector; |
| } |
| |
| private ActiveMQConnectionFactory getTopLevelQueueConnectionFactory(ApplicationContext ctx) { |
| ActiveMQConnectionFactory factory = null; |
| String[] inputChannelBeanIds = ctx |
| .getBeanNamesForType(org.apache.uima.adapter.jms.activemq.JmsInputChannel.class); |
| String beanId = null; |
| for (int i = 0; i < inputChannelBeanIds.length; i++) { |
| JmsInputChannel inputChannel = (JmsInputChannel) ctx.getBean(inputChannelBeanIds[i]); |
| if (inputChannel.getName().startsWith("top_level_input_queue_service")) { |
| while ((factory = ((JmsInputChannel) inputChannel).getConnectionFactory()) == null) { |
| try { |
| Thread.currentThread().sleep(50); |
| } catch (Exception e) { |
| } |
| } |
| break; |
| } |
| } |
| return factory; |
| } |
| |
| private ActiveMQPrefetchPolicy getPrefetchPolicy(int aPrefetchSize) { |
| ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); |
| prefetchPolicy.setQueuePrefetch(aPrefetchSize); |
| return prefetchPolicy; |
| } |
| |
| private ActiveMQConnectionFactory getNewConnectionFactory(ApplicationContext ctx) |
| throws Exception { |
| ActiveMQConnectionFactory factory = getTopLevelQueueConnectionFactory(ctx); |
| if (factory != null) { |
| String brokerURL = factory.getBrokerURL(); |
| factory = new ActiveMQConnectionFactory(brokerURL); |
| } |
| return factory; |
| } |
| |
| private int getConcurrentConsumerCount(ApplicationContext ctx) { |
| String[] listenerBeanIds = ctx |
| .getBeanNamesForType(org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer.class); |
| String beanId = null; |
| int concurrentConsumerCount = -1; |
| for (int i = 0; i < listenerBeanIds.length; i++) { |
| UimaDefaultMessageListenerContainer lsnr = (UimaDefaultMessageListenerContainer) ctx |
| .getBean(listenerBeanIds[i]); |
| if (lsnr.getDestinationName().startsWith("asynAggr_retQ")) { |
| return lsnr.getConcurrentConsumers(); |
| } |
| } |
| |
| return -1; |
| |
| } |
| |
| public AnalysisEngineController getTopLevelController() { |
| return topLevelController; |
| } |
| |
| private void initializeTopLevelController(AnalysisEngineController cntlr, ApplicationContext ctx) |
| throws Exception { |
| ((FileSystemXmlApplicationContext) ctx).setDisplayName(cntlr.getComponentName()); |
| cntlr.addControllerCallbackListener(this); |
| |
| String inputQueueName = cntlr.getServiceEndpointName(); |
| if (inputQueueName != null) { |
| if (ctx.containsBean(inputQueueName)) { |
| ActiveMQQueue queue = (ActiveMQQueue) ctx.getBean(inputQueueName); |
| if (cntlr.getServiceInfo() != null) { |
| cntlr.getServiceInfo().setInputQueueName(queue.getQueueName()); |
| } |
| } else { |
| if (cntlr.getServiceInfo() != null) { |
| cntlr.getServiceInfo().setInputQueueName(inputQueueName); |
| } |
| } |
| } |
| |
| // If this is a Cas Multiplier add a special temp queue for receiving Free CAS |
| // notifications. |
| if (cntlr.isCasMultiplier()) { |
| |
| ActiveMQConnectionFactory cf = getNewConnectionFactory(ctx); // getTopLevelQueueConnectionFactory( |
| // ctx ); |
| ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy(MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q); |
| cf.setPrefetchPolicy(prefetchPolicy); |
| // Create a listener and a temp queue for Free CAS notifications. |
| UimaDefaultMessageListenerContainer connector = produceListenerConnector(cf); |
| System.out.println(">>>> Cas Multiplier Controller:" + cntlr.getComponentName() |
| + " Activated Listener to Receive Free CAS Notifications - Temp Queue Name:" |
| + connector.getEndpointName()); |
| // Direct all messages to the InputChannel |
| connector.setMessageListener(((JmsInputChannel) cntlr.getInputChannel())); |
| ((JmsInputChannel) cntlr.getInputChannel()).setListenerContainer(connector); |
| // Save the temp queue reference in the Output Channel. The output channel will |
| // add this queue to every outgoing message containing a CAS generated by the |
| // Cas Multiplier. |
| ((JmsOutputChannel) cntlr.getOutputChannel()) |
| .setFreeCasQueue(connector.getListenerEndpoint()); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), |
| "initializeTopLevelController", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAJMS_activated_fcq__CONFIG", |
| new Object[] { cntlr.getComponentName(), connector.getEndpointName() }); |
| } |
| } |
| |
| if (cntlr instanceof AggregateAnalysisEngineController) { |
| // Get a map of delegates for the top level aggregate |
| Map destinationMap = ((AggregateAnalysisEngineController) cntlr).getDestinations(); |
| Set set = destinationMap.entrySet(); |
| // iterate over endpoints (delegates) to find those that |
| // need to reply to a temp queue. |
| for (Iterator it = set.iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| Endpoint endpoint = (Endpoint) entry.getValue(); |
| |
| // Will this endpoint reply to a temp queue |
| if (endpoint != null && endpoint.isTempReplyDestination()) { |
| // block here until the Resolver creates a temp |
| // queue and the endpoint initialization is |
| // complete. When the resolver creates a temp quee it will |
| // notify a listener container with a reference |
| // to the temp queue. The listener container will in |
| // turn call setDestination on the endpoint to inject |
| // the destination object ( a temp queue) for the |
| // delegate to reply to. |
| while (!endpoint.replyDestinationFailed() && endpoint.getDestination() == null) { |
| synchronized (endpoint) { |
| try { |
| endpoint.wait(200); |
| } catch (InterruptedException e) { |
| } |
| } |
| } |
| System.out.println("Remote Delegate " + endpoint.getDelegateKey() + " Reply Queue:" |
| + endpoint.getDestination()); |
| |
| } |
| } |
| int concurrentConsumerCountOnReplies = getConcurrentConsumerCount(ctx); |
| // Configure and initialize vm transport in the top level aggregate. |
| // The aggregate will initialize all delegates with the vm transport. |
| ((AggregateAnalysisEngineController) cntlr) |
| .initializeVMTransport(concurrentConsumerCountOnReplies); |
| // Complete initialization of the aggregate by sending |
| // getMeta requests to |
| // all remote delegates (if any). Collocated delegates |
| // have already |
| // returned their metadata to the aggregate. |
| ((AggregateAnalysisEngineController) cntlr).sendRequestForMetadataToRemoteDelegates(); |
| } |
| |
| } |
| |
| private String initializeContainer(ApplicationContext ctx) throws Exception { |
| serviceInitializationCompleted = false; |
| serviceInitializationException = false; |
| // Wrap Spring context |
| UimaEEAdminSpringContext springAdminContext = new UimaEEAdminSpringContext( |
| (FileSystemXmlApplicationContext) ctx); |
| // Find all deployed Controllers |
| String[] controllers = ctx |
| .getBeanNamesForType(org.apache.uima.aae.controller.AnalysisEngineController.class); |
| for (int i = 0; controllers != null && i < controllers.length; i++) { |
| AnalysisEngineController cntlr = (AnalysisEngineController) ctx.getBean(controllers[i]); |
| if (cntlr instanceof org.apache.uima.aae.controller.UimacppServiceController) { |
| cntlr.addControllerCallbackListener(this); |
| topLevelController = cntlr; |
| } else { |
| // Pass a reference to the context to each of the Controllers |
| cntlr.setUimaEEAdminContext(springAdminContext); |
| if (cntlr.isTopLevelComponent()) { |
| topLevelController = cntlr; |
| initializeTopLevelController(cntlr, ctx); |
| } |
| } |
| } |
| // grab the semaphore so that waitForServiceInitialization() blocks until the |
| // semaphore is released. |
| try { |
| serviceInitializationSemaphore.acquire(); |
| } catch (InterruptedException e) { |
| } |
| try { |
| // blocks until the top level controller sends a notification. |
| // Notification is send |
| // when either the controller successfully initialized or it failed |
| // during initialization |
| waitForServiceNotification(); |
| // success, the service initialized |
| startListeners(); |
| } catch (Exception e) { |
| // Query the container for objects that implement |
| // ControllerLifecycle interface. These |
| // objects are typically of type AnalysisEngineController or |
| // UimacppServiceController. |
| String[] asyncServiceList = ctx |
| .getBeanNamesForType(org.apache.uima.aae.controller.ControllerLifecycle.class); |
| // Given a valid list of controllers select the first from the list |
| // and |
| // initiate a shutdown. We don't care which controller will be |
| // invoked. In case of |
| // AggregateAnalysisEngineController the terminate event will |
| // propagate all the way |
| // to the top controller in the hierarchy and the shutdown will take |
| // place from there. |
| // If the controller is of kind UimecppServiceController or |
| // PrimitiveAnalysisController |
| // the termination logic will be immediately triggered in the |
| // terminate() method. |
| if (asyncServiceList != null && asyncServiceList.length > 0) { |
| |
| ControllerLifecycle ctrer = (ControllerLifecycle) ctx.getBean(asyncServiceList[0]); |
| // Send a trigger to initiate shutdown. |
| if (ctrer instanceof AnalysisEngineController) { |
| ((AnalysisEngineController) ctrer).getControllerLatch().release(); |
| } |
| |
| ctrer.terminate(); |
| } |
| |
| if (ctx instanceof FileSystemXmlApplicationContext) { |
| ((FileSystemXmlApplicationContext) ctx).destroy(); |
| } |
| throw e; |
| } |
| String containerId = new UIDGenerator().nextId(); |
| if (springContainerRegistry != null) { |
| // Register each container in a global map. When stopping the |
| // client, each container in the map will be stopped. |
| springContainerRegistry.put(containerId, springAdminContext); |
| } |
| return containerId; |
| } |
| /** |
| * Starts Spring listeners on input queues. These listeners are not auto started |
| * during initialization. Only after successful service initialization these listeners are |
| * activated. |
| * |
| * @param listeners - list of listeners to start |
| * @param ctx - spring context |
| * @throws Exception |
| */ |
| private void doStartListeners(String[] listeners, FileSystemXmlApplicationContext ctx ) throws Exception { |
| for (int i = 0; listeners != null && i < listeners.length; i++) { |
| UimaDefaultMessageListenerContainer listener = |
| (UimaDefaultMessageListenerContainer) ctx.getBean(listeners[i]); |
| // Only start those listeners that are not running yet. |
| if ( listener != null && !listener.isRunning()) { |
| if ( topLevelController != null ) { |
| System.out.println("Controller:"+topLevelController.getComponentName()+" Activating Listener on Queue:"+listener.getDestination()+" Selector:"+listener.getMessageSelector()+" Broker:"+listener.getBrokerUrl()); |
| topLevelController.changeState(ServiceState.RUNNING); |
| } |
| listener.start(); |
| } |
| } |
| } |
| public void startListeners() throws Exception { |
| if ( springContainerRegistry == null || springContainerRegistry.size() == 0) { |
| if ( context != null ) { |
| String[] listeners = context |
| .getBeanNamesForType(org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer.class); |
| doStartListeners(listeners, context); |
| } |
| } else { |
| Iterator registryIterator = springContainerRegistry.keySet().iterator(); |
| while( registryIterator.hasNext()) { |
| String id = (String)registryIterator.next(); |
| UimaEEAdminSpringContext springAdminContext = (UimaEEAdminSpringContext)springContainerRegistry.get(id); |
| if ( springAdminContext != null ) { |
| FileSystemXmlApplicationContext ctx = |
| (FileSystemXmlApplicationContext)springAdminContext.getSpringContainer(); |
| if ( ctx != null ) { |
| String[] listeners = ctx |
| .getBeanNamesForType(org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer.class); |
| doStartListeners(listeners, ctx); |
| } |
| } |
| } |
| } |
| } |
| public String deploy(String springContextFile) throws ResourceInitializationException { |
| if (springContextFile == null) { |
| throw new ResourceInitializationException(new Exception("Spring Context File Not Specified")); |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "deploy", |
| JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_deploy_container__CONFIG", |
| new Object[] { springContextFile }); |
| } |
| try { |
| // Deploy beans in the Spring container. Although the deployment is |
| // synchronous ( one bean at a time), some beans run in a separate |
| // threads. The completion of container deployment doesnt |
| // necessarily mean that all beans have initialized completely. |
| if (!springContextFile.startsWith("file:")) { |
| springContextFile = "file:" + springContextFile; |
| } |
| context = new FileSystemXmlApplicationContext(springContextFile); |
| return initializeContainer(context); |
| } catch (ResourceInitializationException e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "deploy", |
| JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", |
| new Object[] { Thread.currentThread().getId(), e }); |
| } |
| throw e; |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "deploy", |
| JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", |
| new Object[] { Thread.currentThread().getId(), e }); |
| } |
| throw new ResourceInitializationException(e); |
| } |
| |
| } |
| |
| public String deploy(String[] springContextFiles) throws ResourceInitializationException { |
| if (springContextFiles == null) { |
| throw new ResourceInitializationException(new Exception("Spring Context File List is Empty")); |
| } |
| // Log context files |
| for (int i = 0; i < springContextFiles.length; i++) { |
| if (!springContextFiles[i].startsWith("file:")) { |
| springContextFiles[i] = "file:" + springContextFiles[i]; |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), |
| "deploy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAJMS_deploy_container__CONFIG", new Object[] { springContextFiles[i] }); |
| } |
| } |
| try { |
| // Deploy beans in the Spring container. Although the deployment is |
| // synchronous ( one bean at a time), some beans run in a separate |
| // threads. The completion of container deployment doesnt |
| // necessarily mean that all beans have initialized completely. |
| context = new FileSystemXmlApplicationContext(springContextFiles); |
| return initializeContainer(context); |
| } catch (ResourceInitializationException e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "deploy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e }); |
| } |
| throw e; |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "deploy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e }); |
| } |
| throw new ResourceInitializationException(e); |
| } |
| |
| } |
| |
| public void undeploy(int stop_level) throws Exception { |
| switch (stop_level) { |
| case QUIESCE_AND_STOP: |
| getTopLevelController().quiesceAndStop(); |
| break; |
| |
| case STOP_NOW: |
| getTopLevelController().terminate(); |
| break; |
| |
| default: |
| throw new UnsupportedOperationException( |
| "Unsupported argument value in the undeploy() call. Please use stop level " |
| + QUIESCE_AND_STOP + " OR " + STOP_NOW |
| + " as an argument to undeploy() method."); |
| } |
| } |
| |
| protected void waitForServiceNotification() throws Exception { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "waitForServiceNotification", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAJMS_awaiting_container_init__INFO", new Object[] {}); |
| } |
| try { |
| serviceInitializationSemaphore.acquire(); |
| } catch (InterruptedException e) { |
| } finally { |
| serviceInitializationSemaphore.release(); |
| } |
| if (serviceInitializationException) { |
| throw new ResourceInitializationException(new Exception("Details of failure(s) written to UIMA log")); |
| } |
| } |
| |
| public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) { |
| |
| // Initialization exception. Notify blocking thread and indicate a |
| // problem |
| serviceInitializationException = true; |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "notifyOnInitializationFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAJMS_container_init_exception__WARNING", new Object[] { e }); |
| } |
| serviceInitializationSemaphore.release(); |
| } |
| |
| public void notifyOnInitializationSuccess(AnalysisEngineController aController) { |
| serviceInitializationCompleted = true; |
| serviceInitializationSemaphore.release(); |
| } |
| |
| public void notifyOnInitializationFailure(Exception e) { |
| notifyOnInitializationFailure(null, e); |
| } |
| |
| public void notifyOnInitializationSuccess() { |
| notifyOnInitializationSuccess(null); |
| } |
| |
| public void notifyOnTermination(String message) { |
| System.out.println("-------------------> Container Terminated"); |
| } |
| |
| public FileSystemXmlApplicationContext getSpringContext() { |
| return context; |
| } |
| |
| public boolean isInitialized() { |
| return serviceInitializationCompleted; |
| } |
| |
| public boolean initializationFailed() { |
| return serviceInitializationException; |
| } |
| } |