| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.uima.aae.handler.input; |
| |
| import org.apache.uima.UIMAFramework; |
| import org.apache.uima.aae.UIMAEE_Constants; |
| import org.apache.uima.aae.UimaSerializer; |
| import org.apache.uima.aae.InProcessCache.CacheEntry; |
| import org.apache.uima.aae.controller.AggregateAnalysisEngineController; |
| import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl; |
| import org.apache.uima.aae.controller.Endpoint; |
| import org.apache.uima.aae.controller.Endpoint_impl; |
| import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController; |
| import org.apache.uima.aae.controller.LocalCache.CasStateEntry; |
| import org.apache.uima.aae.delegate.Delegate; |
| import org.apache.uima.aae.error.AsynchAEException; |
| import org.apache.uima.aae.error.ErrorContext; |
| import org.apache.uima.aae.error.InvalidMessageException; |
| import org.apache.uima.aae.handler.HandlerBase; |
| import org.apache.uima.aae.jmx.ServicePerformance; |
| import org.apache.uima.aae.message.AsynchAEMessage; |
| import org.apache.uima.aae.message.MessageContext; |
| import org.apache.uima.aae.monitor.Monitor; |
| import org.apache.uima.aae.monitor.statistics.DelegateStats; |
| import org.apache.uima.aae.monitor.statistics.LongNumericStatistic; |
| import org.apache.uima.aae.monitor.statistics.TimerStats; |
| import org.apache.uima.analysis_engine.asb.impl.FlowContainer; |
| import org.apache.uima.cas.CAS; |
| import org.apache.uima.cas.Marker; |
| import org.apache.uima.cas.impl.XmiSerializationSharedData; |
| import org.apache.uima.util.Level; |
| |
| public class ProcessRequestHandler_impl extends HandlerBase { |
| private static final Class CLASS_NAME = ProcessRequestHandler_impl.class; |
| |
| private Object mux = new Object(); |
| |
| private UimaSerializer uimaSerializer = new UimaSerializer(); |
| |
| public ProcessRequestHandler_impl(String aName) { |
| super(aName); |
| } |
| |
| private void cacheStats(String aCasReferenceId, long aTimeWaitingForCAS, |
| long aTimeToDeserializeCAS) throws Exception { |
| CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId); |
| entry.incrementTimeWaitingForCAS(aTimeWaitingForCAS); |
| entry.incrementTimeToDeserializeCAS(aTimeToDeserializeCAS); |
| } |
| |
| private boolean messageContainsXMI(MessageContext aMessageContext, String casReferenceId) |
| throws Exception { |
| // Fetch serialized CAS from the message |
| String xmi = aMessageContext.getStringMessage(); |
| // ***************************************************************** |
| // ***** NO XMI In Message. Kick this back to sender with exception |
| // ***************************************************************** |
| if (xmi == null) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_message_has_no_cargo__INFO", |
| new Object[] { aMessageContext.getEndpoint().getEndpoint() }); |
| } |
| CasStateEntry stateEntry = null; |
| String parentCasReferenceId = null; |
| try { |
| stateEntry = getController().getLocalCache().lookupEntry(casReferenceId); |
| if (stateEntry != null && stateEntry.isSubordinate()) { |
| CasStateEntry topParentEntry = getController().getLocalCache().getTopCasAncestor( |
| casReferenceId); |
| parentCasReferenceId = topParentEntry.getCasReferenceId(); |
| } |
| } catch (Exception e) { |
| } |
| |
| getController().getOutputChannel().sendReply( |
| new InvalidMessageException("No XMI data in message"), casReferenceId, |
| parentCasReferenceId, aMessageContext.getEndpoint(), AsynchAEMessage.Process); |
| // Dont process this empty message |
| return false; |
| } |
| return true; |
| } |
| |
| private synchronized CAS getCAS(boolean fetchCASFromShadowCasPool, String shadowCasPoolKey, |
| String casReceivedFrom) { |
| CAS cas = null; |
| // If this is a new CAS (generated by a CM), fetch a CAS from a Shadow Cas Pool associated with |
| // a CM that |
| // produced the CAS. Each CM will have its own Shadow Cas Pool |
| if (fetchCASFromShadowCasPool) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_cm__FINE", |
| new Object[] { shadowCasPoolKey }); |
| } |
| // Aggregate time spent waiting for a CAS in the shadow cas pool |
| ((AggregateAnalysisEngineController) getController()).getDelegateServicePerformance( |
| shadowCasPoolKey).beginWaitOnShadowCASPool(); |
| cas = getController().getCasManagerWrapper().getNewCas(shadowCasPoolKey); |
| ((AggregateAnalysisEngineController) getController()).getDelegateServicePerformance( |
| shadowCasPoolKey).endWaitOnShadowCASPool(); |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "getCAS", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted_cm__FINE", |
| new Object[] { shadowCasPoolKey }); |
| } |
| } else { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas__FINE", |
| new Object[] { casReceivedFrom }); |
| } |
| // Aggregate time spent waiting for a CAS in the service cas pool |
| getController().getServicePerformance().beginWaitOnCASPool(); |
| |
| cas = getController().getCasManagerWrapper().getNewCas(); |
| getController().getServicePerformance().endWaitOnCASPool(); |
| ServicePerformance sp = getController().getServicePerformance(); |
| sp.incrementCasPoolWaitTime(sp.getTimeWaitingForCAS()); |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "getCAS", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE", |
| new Object[] { casReceivedFrom }); |
| } |
| } |
| return cas; |
| } |
| |
| /** |
| * |
| * @param casReferenceId |
| * @param freeCasEndpoint |
| * @param shadowCasPoolKey |
| * @param aMessageContext |
| * @return |
| * @throws Exception |
| */ |
| private CacheEntry deserializeCASandRegisterWithCache(String casReferenceId, |
| Endpoint freeCasEndpoint, String shadowCasPoolKey, MessageContext aMessageContext) |
| throws Exception { |
| long inTime = System.nanoTime(); |
| boolean casRegistered = false; |
| |
| // Time how long we wait on Cas Pool to fetch a new CAS |
| long t1 = getController().getCpuTime(); |
| // ************************************************************************* |
| // Fetch CAS from a Cas Pool. If the CAS came from a Cas Multiplier |
| // fetch the CAS from a shadow CAS pool. Otherwise, fetch the CAS |
| // from the service CAS Pool. |
| // ************************************************************************* |
| Endpoint endpoint = aMessageContext.getEndpoint(); |
| |
| CAS cas = getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), shadowCasPoolKey, |
| endpoint.getEndpoint()); |
| long timeWaitingForCAS = getController().getCpuTime() - t1; |
| // Check if we are still running |
| if (getController().isStopped()) { |
| // The Controller is in shutdown state. |
| getController().dropCAS(cas); |
| return null; |
| } |
| // ************************************************************************* |
| // Deserialize CAS from the message |
| // ************************************************************************* |
| t1 = getController().getCpuTime(); |
| String serializationStrategy = endpoint.getSerializer(); |
| XmiSerializationSharedData deserSharedData = null; |
| CacheEntry entry = null; |
| if (serializationStrategy.equals("xmi")) { |
| // Fetch serialized CAS from the message |
| String xmi = aMessageContext.getStringMessage(); |
| deserSharedData = new XmiSerializationSharedData(); |
| // UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1); |
| uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1); |
| } else if (serializationStrategy.equals("binary")) { |
| // ************************************************************************* |
| // Register the CAS with a local cache |
| // ************************************************************************* |
| // CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, |
| // deserSharedData, casReferenceId); |
| byte[] binarySource = aMessageContext.getByteMessage(); |
| // UimaSerializer.deserializeCasFromBinary(binarySource, cas); |
| uimaSerializer.deserializeCasFromBinary(binarySource, cas); |
| } |
| |
| // ************************************************************************* |
| // Check and set up for Delta CAS reply |
| // ************************************************************************* |
| boolean acceptsDeltaCas = false; |
| Marker marker = null; |
| if (aMessageContext.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) { |
| acceptsDeltaCas = aMessageContext.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas); |
| if (acceptsDeltaCas) { |
| marker = cas.createMarker(); |
| } |
| } |
| // ************************************************************************* |
| // Register the CAS with a local cache |
| // ************************************************************************* |
| // CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, |
| // deserSharedData, casReferenceId); |
| entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, |
| casReferenceId, marker, acceptsDeltaCas); |
| |
| long timeToDeserializeCAS = getController().getCpuTime() - t1; |
| getController().incrementDeserializationTime(timeToDeserializeCAS); |
| LongNumericStatistic statistic; |
| if ((statistic = getController().getMonitor().getLongNumericStatistic("", |
| Monitor.TotalDeserializeTime)) != null) { |
| statistic.increment(timeToDeserializeCAS); |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_deserialize_cas_time_FINE", |
| new Object[] { (double) timeToDeserializeCAS / 1000000.0 }); |
| } |
| |
| // Update Stats |
| ServicePerformance casStats = getController().getCasStatistics(casReferenceId); |
| casStats.incrementCasDeserializationTime(timeToDeserializeCAS); |
| if (getController().isTopLevelComponent()) { |
| synchronized (mux) { |
| getController().getServicePerformance().incrementCasDeserializationTime( |
| timeToDeserializeCAS); |
| } |
| } |
| getController().saveTime(inTime, casReferenceId, getController().getName()); |
| |
| if (getController() instanceof AggregateAnalysisEngineController) { |
| // If the message came from a Cas Multiplier, associate the input/parent CAS id with this CAS |
| if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) { |
| // Fetch parent CAS id |
| String inputCasReferenceId = aMessageContext |
| .getMessageStringProperty(AsynchAEMessage.InputCasReference); |
| if (shadowCasPoolKey != null) { |
| // Save the key of the Cas Multiplier in the cache. It will be now known which Cas |
| // Multiplier produced this CAS |
| entry.setCasProducerKey(shadowCasPoolKey); |
| } |
| // associate this subordinate CAS with the parent CAS |
| entry.setInputCasReferenceId(inputCasReferenceId); |
| // Save a Cas Multiplier endpoint where a Free CAS notification will be sent |
| entry.setFreeCasEndpoint(freeCasEndpoint); |
| cacheStats(inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS); |
| } else { |
| cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS); |
| } |
| DelegateStats stats = new DelegateStats(); |
| if (entry.getStat() == null) { |
| entry.setStat(stats); |
| // Add entry for self (this aggregate). MessageContext.getEndpointName() |
| // returns the name of the queue receiving the message. |
| stats.put(getController().getServiceEndpointName(), new TimerStats()); |
| } else { |
| if (!stats.containsKey(getController().getServiceEndpointName())) { |
| stats.put(getController().getServiceEndpointName(), new DelegateStats()); |
| } |
| } |
| } else { |
| cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS); |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_deserialized_cas_ready_to_process_FINE", |
| new Object[] { aMessageContext.getEndpoint().getEndpoint() }); |
| } |
| cacheProcessCommandInClientEndpoint(); |
| return entry; |
| } |
| |
| private String getCasReferenceId(MessageContext aMessageContext) throws Exception { |
| if (!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_message_has_cas_refid__INFO", |
| new Object[] { aMessageContext.getEndpoint().getEndpoint() }); |
| } |
| getController().getOutputChannel().sendReply( |
| new InvalidMessageException("No Cas Reference Id Received From Delegate In message"), |
| null, null, aMessageContext.getEndpoint(), AsynchAEMessage.Process); |
| return null; |
| } |
| return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference); |
| } |
| |
| /** |
| * Handles process request from a remote client |
| * |
| * @param aMessageContext |
| * - contains a message from UIMA-AS Client |
| * @throws AsynchAEException |
| */ |
| private void handleProcessRequestFromRemoteClient(MessageContext aMessageContext) |
| throws AsynchAEException { |
| CacheEntry entry = null; |
| String casReferenceId = null; |
| // Check if there is a cargo in the message |
| if (aMessageContext.getMessageIntProperty(AsynchAEMessage.Payload) == AsynchAEMessage.XMIPayload |
| && aMessageContext.getStringMessage() == null) { |
| return; // No XMI just return |
| } |
| |
| try { |
| |
| String newCASProducedBy = null; |
| // Get the CAS Reference Id of the input CAS |
| // Fetch id of the CAS from the message. If it doesnt exist the method will create an entry in |
| // the log file and return null |
| casReferenceId = getCasReferenceId(aMessageContext); |
| if (casReferenceId == null) { |
| return; // Invalid message. Nothing to do |
| } |
| // Initially make both equal |
| String inputCasReferenceId = casReferenceId; |
| // Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier |
| Endpoint freeCasEndpoint = null; |
| |
| CasStateEntry inputCasStateEntry = null; |
| |
| // CASes generated by a Cas Multiplier will have a CasSequence property set. |
| if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) { |
| // Fetch the name of the Cas Multiplier's input queue |
| // String cmEndpointName = aMessageContext.getEndpoint().getEndpoint(); |
| String cmEndpointName = aMessageContext |
| .getMessageStringProperty(AsynchAEMessage.MessageFrom); |
| newCASProducedBy = ((AggregateAnalysisEngineController) getController()) |
| .lookUpDelegateKey(cmEndpointName); |
| // Fetch an ID of the parent CAS |
| inputCasReferenceId = aMessageContext |
| .getMessageStringProperty(AsynchAEMessage.InputCasReference); |
| // Fetch Cache entry for the parent CAS |
| CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS( |
| inputCasReferenceId); |
| // Fetch an endpoint where Free CAS Notification must be sent. |
| // This endpoint is unique per CM instance. Meaning, each |
| // instance of CM will have an endpoint where it expects Free CAS |
| // notifications. |
| freeCasEndpoint = aMessageContext.getEndpoint(); |
| // Clone an endpoint where Free Cas Request will be sent |
| freeCasEndpoint = (Endpoint) ((Endpoint_impl) freeCasEndpoint).clone(); |
| |
| if (getController() instanceof AggregateAnalysisEngineController) { |
| inputCasStateEntry = ((AggregateAnalysisEngineController) getController()) |
| .getLocalCache().lookupEntry(inputCasReferenceId); |
| |
| // Associate Free Cas Notification Endpoint with an input Cas |
| inputCasStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint); |
| } |
| |
| computeStats(aMessageContext, inputCasReferenceId); |
| // Reset the destination |
| aMessageContext.getEndpoint().setDestination(null); |
| // This CAS came in from a CAS Multiplier. Treat it differently than the |
| // input CAS. In case the Aggregate needs to send this CAS to the |
| // client, retrieve the client destination by looking up the client endpoint |
| // using input CAS reference id. CASes generated by the CAS multiplier will have |
| // the same Cas Reference id. |
| Endpoint replyToEndpoint = inputCasCacheEntry.getMessageOrigin(); |
| // The message context contains a Cas Multiplier endpoint. Since |
| // we dont want to send a generated CAS back to the CM, override |
| // with an endpoint provided by the client of |
| // this service. Client endpoint is attached to an input Cas cache entry. |
| aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint()); |
| aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI()); |
| |
| // Before sending a CAS to Cas Multiplier, the aggregate has |
| // saved the CM key in the CAS cache entry. Fetch the key |
| // of the CM so that we can ask the right Shadow Cas Pool for |
| // a new CAS. Every Shadow Cas Pool has a unique id which |
| // corresponds to a Cas Multiplier key. |
| // newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey(); |
| if (getController() instanceof AggregateAnalysisEngineController) { |
| Endpoint casMultiplierEndpoint = ((AggregateAnalysisEngineController) getController()) |
| .lookUpEndpoint(newCASProducedBy, false); |
| if (casMultiplierEndpoint != null) { |
| // Save the URL of the broker managing the Free Cas Notification queue. |
| // This is needed when we try to establish a connection to the broker. |
| freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI()); |
| } |
| } |
| } else if (getController().isTopLevelComponent()) { |
| if (getController() instanceof AggregateAnalysisEngineController) { |
| ((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId, |
| aMessageContext.getEndpoint()); |
| } |
| if (getController().isCasMultiplier()) { |
| // Send an ack to the client. The ack message will include a FreeCasQueue |
| // to enable the client to send messages to the service processing a CAS. |
| try { |
| getController().getOutputChannel().sendReply(AsynchAEMessage.ServiceInfo, |
| aMessageContext.getEndpoint(), casReferenceId); |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| } |
| } |
| } |
| // To prevent processing multiple messages with the same CasReferenceId, check the CAS cache |
| // to see if the message with a given CasReferenceId is already being processed. It is, the |
| // message contains the same request possibly issued by the caller due to timeout. Also this |
| // mechanism helps with dealing with scenario when this service is not up when the client |
| // sends |
| // request. The client can keep re-sending the same request until its timeout thresholds are |
| // exceeded. By that time, there may be multiple messages in this service queue with the same |
| // CasReferenceId. When the service finally comes back up, it will have multiple messages in |
| // its queue possibly from the same client. Only the first message for any given |
| // CasReferenceId |
| // should be processed. |
| if (!getController().getInProcessCache().entryExists(casReferenceId)) { |
| CasStateEntry cse = null; |
| if (getController().getLocalCache().lookupEntry(casReferenceId) == null) { |
| // Create a new entry in the local cache for the CAS received from the remote |
| cse = getController().getLocalCache().createCasStateEntry(casReferenceId); |
| // Check if this CAS is a child |
| if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) { |
| cse.setInputCasReferenceId(inputCasReferenceId); |
| } |
| } else { |
| cse = getController().getLocalCache().lookupEntry(casReferenceId); |
| } |
| |
| if (getController() instanceof AggregateAnalysisEngineController |
| && aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) { |
| String delegateInputQueueName = aMessageContext |
| .getMessageStringProperty(AsynchAEMessage.MessageFrom); |
| String delegateKey = ((AggregateAnalysisEngineController) getController()) |
| .lookUpDelegateKey(delegateInputQueueName); // aMessageContext.getEndpoint().getEndpoint()); |
| if (delegateKey != null) { |
| Delegate delegate = ((AggregateAnalysisEngineController) getController()) |
| .lookupDelegate(delegateKey); |
| // Save the last delegate handling this CAS |
| cse.setLastDelegate(delegate); |
| // If there is one thread receiving messages from Cas Multiplier increment number of |
| // child Cases |
| // of the parent CAS. If there are more threads (consumers) a special object |
| // ConcurrentMessageListener |
| // has already incremented the count. This special object enforces order of processing |
| // for CASes |
| // coming in from the Cas Multiplier. |
| if (!delegate.hasConcurrentConsumersOnReplyQueue()) { |
| inputCasStateEntry.incrementSubordinateCasInPlayCount(); |
| } |
| } |
| } |
| |
| entry = deserializeCASandRegisterWithCache(casReferenceId, freeCasEndpoint, |
| newCASProducedBy, aMessageContext); |
| if (getController().isStopped() || entry == null || entry.getCas() == null) { |
| if (entry != null) { |
| // The Controller is in shutdown state, release the CAS |
| getController().dropCAS(entry.getCasReferenceId(), true); |
| entry = null; |
| } |
| return; |
| } |
| // ***************************************************************** |
| // Process the CAS |
| // ***************************************************************** |
| invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext, |
| newCASProducedBy); |
| } else { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_duplicate_request__INFO", new Object[] { casReferenceId }); |
| } |
| } |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", e); |
| } |
| ErrorContext errorContext = new ErrorContext(); |
| errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint()); |
| errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process); |
| errorContext.add(AsynchAEMessage.CasReference, casReferenceId); |
| if (entry != null) { |
| getController().dropCAS(entry.getCas()); |
| } |
| getController().getErrorHandlerChain().handle(e, errorContext, getController()); |
| } |
| |
| } |
| |
| private void handleProcessRequestWithCASReference(MessageContext aMessageContext) |
| throws AsynchAEException { |
| boolean isNewCAS = false; |
| String newCASProducedBy = null; |
| |
| try { |
| // This is only used when handling CASes produced by CAS Multiplier |
| String inputCasReferenceId = null; |
| CAS cas = null; |
| CasStateEntry cse = null; |
| String casReferenceId = getCasReferenceId(aMessageContext); |
| if ((cse = getController().getLocalCache().lookupEntry(casReferenceId)) == null) { |
| // Create a new entry in the local cache for the CAS received from the remote |
| cse = getController().getLocalCache().createCasStateEntry(casReferenceId); |
| } |
| |
| // Check if this Cas has been sent from a Cas Multiplier. If so, its sequence will be > 0 |
| if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) { |
| isNewCAS = true; |
| |
| Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint(); |
| |
| if (casMultiplierEndpoint == null) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "handleProcessRequestWithCASReference", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO", |
| new Object[] { casReferenceId }); |
| } |
| return; |
| } |
| // Get the id of the parent Cas |
| inputCasReferenceId = aMessageContext |
| .getMessageStringProperty(AsynchAEMessage.InputCasReference); |
| if (cse.getInputCasReferenceId() == null) { |
| cse.setInputCasReferenceId(inputCasReferenceId); |
| } |
| |
| if (getController() instanceof AggregateAnalysisEngineController) { |
| CasStateEntry parentCasEntry = getController().getLocalCache().lookupEntry( |
| inputCasReferenceId); |
| // Check if the parent CAS is in a failed state first |
| if (parentCasEntry != null && parentCasEntry.isFailed()) { |
| // handle CAS release |
| getController().process(null, casReferenceId); |
| return; |
| } |
| |
| String delegateKey = ((AggregateAnalysisEngineController) getController()) |
| .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint()); |
| Delegate delegate = ((AggregateAnalysisEngineController) getController()) |
| .lookupDelegate(delegateKey); |
| cse.setLastDelegate(delegate); |
| newCASProducedBy = delegate.getKey(); |
| casMultiplierEndpoint.setIsCasMultiplier(true); |
| try { |
| // Save the endpoint of the CM which produced the Cas |
| getController().getInProcessCache().setCasProducer(casReferenceId, newCASProducedBy); |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), |
| "handleProcessRequestWithCASReference", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", |
| new Object[] { e }); |
| } |
| return; |
| } |
| // Safety check. The input Cas should not be null here |
| if (inputCasReferenceId != null) { |
| try { |
| Endpoint endp = null; |
| |
| // Located the origin of the parent Cas. The produced Cas will inherit the origin from |
| // its parent. |
| // Once the origin is identified, save the origin using the produced Cas id as a key. |
| if (endp == null) { |
| boolean gotTheEndpoint = false; |
| String parentCasId = inputCasReferenceId; |
| // Loop through the parent tree until an origin is found |
| while (!gotTheEndpoint) { |
| // Check if the current parent has an associated origin |
| endp = ((AggregateAnalysisEngineController) getController()) |
| .getMessageOrigin(parentCasId); |
| // Check if there is an origin. If so, we are done |
| if (endp != null) { |
| break; |
| } |
| // The current parent has no origin, get its parent and try again |
| CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS( |
| parentCasId); |
| parentCasId = entry.getInputCasReferenceId(); |
| // Check if we reached the top of the hierarchy tree. If so, we have no origin. |
| // This should |
| // never be the case. Every Cas must have an origin |
| if (parentCasId == null) { |
| break; |
| } |
| } |
| } |
| // If origin not found log it as this indicates an error |
| if (endp == null) { |
| System.out.println("Endpoint Not Found For Cas Id:" + inputCasReferenceId); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "handleProcessRequestWithCASReference", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_msg_origin_not_found__INFO", |
| new Object[] { getController().getComponentName(), inputCasReferenceId }); |
| } |
| } else { |
| ((AggregateAnalysisEngineController) getController()).addMessageOrigin( |
| casReferenceId, endp); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.FINEST, |
| CLASS_NAME.getName(), |
| "handleProcessRequestWithCASReference", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_msg_origin_added__FINEST", |
| new Object[] { getController().getComponentName(), casReferenceId, |
| newCASProducedBy }); |
| } |
| } |
| } catch (Exception e) { |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "handleProcessRequestWithCASReference", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e); |
| } |
| } |
| } else { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb( |
| Level.INFO, |
| CLASS_NAME.getName(), |
| "handleProcessRequestWithCASReference", |
| UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_input_cas_invalid__INFO", |
| new Object[] { getController().getComponentName(), newCASProducedBy, |
| casReferenceId }); |
| } |
| } |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_new_cas__FINE", new Object[] { casReferenceId, newCASProducedBy }); |
| } |
| aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint()); |
| aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI()); |
| } else { |
| if (getController() instanceof AggregateAnalysisEngineController) { |
| ((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId, |
| aMessageContext.getEndpoint()); |
| } |
| |
| } |
| cas = getController().getInProcessCache().getCasByReference(casReferenceId); |
| |
| long arrivalTime = System.nanoTime(); |
| getController().saveTime(arrivalTime, casReferenceId, getController().getName());// aMessageContext.getEndpointName()); |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_analyzing_cas__FINE", new Object[] { casReferenceId }); |
| } |
| // Save Process command in the client endpoint. |
| cacheProcessCommandInClientEndpoint(); |
| |
| if (getController().isStopped()) { |
| return; |
| } |
| |
| if (isNewCAS) { |
| invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy); |
| } else { |
| invokeProcess(cas, casReferenceId, null, aMessageContext, newCASProducedBy); |
| } |
| } catch (AsynchAEException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new AsynchAEException(e); |
| } |
| |
| } |
| |
| private void handleProcessRequestWithXCAS(MessageContext aMessageContext) |
| throws AsynchAEException { |
| |
| try { |
| // Get the CAS Reference Id of the input CAS |
| String casReferenceId = getCasReferenceId(aMessageContext); |
| String inputCasReferenceId = casReferenceId; |
| // This is only used when handling CASes produced by CAS Multiplier |
| String newCASProducedBy = null; |
| |
| if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) { |
| // This CAS came in from the CAS Multiplier. Treat it differently than the |
| // input CAS. First, in case the Aggregate needs to send this CAS to the |
| // client, retrieve the client destination by looking up the client endpoint |
| // using input CAS reference id. CASes generated by the CAS multiplier will have |
| // the same Cas Reference id. |
| Endpoint replyToEndpoint = getController().getInProcessCache().getCacheEntryForCAS( |
| casReferenceId).getMessageOrigin(); |
| |
| // |
| if (getController() instanceof AggregateAnalysisEngineController) { |
| newCASProducedBy = ((AggregateAnalysisEngineController) getController()) |
| .lookUpDelegateKey(replyToEndpoint.getEndpoint()); |
| } |
| // MessageContext contains endpoint set by the CAS Multiplier service. Overwrite |
| // this with the endpoint of the client who sent the input CAS. In case this |
| // aggregate is configured to send new CASes to the client we know where to send them. |
| aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint()); |
| aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI()); |
| inputCasReferenceId = String.valueOf(casReferenceId); |
| // Set this to null so that the new CAS gets its own Cas Reference Id below |
| casReferenceId = null; |
| } |
| |
| long arrivalTime = System.nanoTime(); |
| getController().saveTime(arrivalTime, casReferenceId, getController().getName());// aMessageContext.getEndpointName()); |
| |
| // To prevent processing multiple messages with the same CasReferenceId, check the CAS cache |
| // to see if the message with a given CasReferenceId is already being processed. It is, the |
| // message contains the same request possibly issued by the caller due to timeout. Also this |
| // mechanism helps with dealing with scenario when this service is not up when the client |
| // sends |
| // request. The client can keep re-sending the same request until its timeout thresholds are |
| // exceeded. By that time, there may be multiple messages in this service queue with the same |
| // CasReferenceId. When the service finally comes back up, it will have multiple messages in |
| // its queue possibly from the same client. Only the first message for any given |
| // CasReferenceId |
| // should be processed. |
| if (casReferenceId == null |
| || !getController().getInProcessCache().entryExists(casReferenceId)) { |
| String xmi = aMessageContext.getStringMessage(); |
| |
| // ***************************************************************** |
| // ***** NO XMI In Message. Kick this back to sender with exception |
| // ***************************************************************** |
| if (xmi == null) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_message_has_no_cargo__INFO", |
| new Object[] { aMessageContext.getEndpoint().getEndpoint() }); |
| } |
| getController().getOutputChannel().sendReply( |
| new InvalidMessageException("No XMI data in message"), casReferenceId, null, |
| aMessageContext.getEndpoint(), AsynchAEMessage.Process); |
| // Dont process this empty message |
| return; |
| } |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_request_cas__FINE", |
| new Object[] { aMessageContext.getEndpoint().getEndpoint() }); |
| } |
| CAS cas = getController().getCasManagerWrapper().getNewCas(); |
| |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_request_cas_granted__FINE", |
| new Object[] { aMessageContext.getEndpoint().getEndpoint() }); |
| } |
| XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData(); |
| // UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1); |
| uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1); |
| |
| if (casReferenceId == null) { |
| CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, |
| deserSharedData); |
| casReferenceId = entry.getCasReferenceId(); |
| } else { |
| if (getController() instanceof PrimitiveAnalysisEngineController) { |
| getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, |
| casReferenceId); |
| } |
| } |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_deserialized_cas_ready_to_process_FINE", |
| new Object[] { aMessageContext.getEndpoint().getEndpoint() }); |
| } |
| cacheProcessCommandInClientEndpoint(); |
| |
| invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy); |
| } else { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), |
| "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_duplicate_request__INFO", new Object[] { casReferenceId }); |
| } |
| } |
| |
| } catch (AsynchAEException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new AsynchAEException(e); |
| } |
| |
| } |
| |
| private void cacheProcessCommandInClientEndpoint() { |
| Endpoint clientEndpoint = getController().getClientEndpoint(); |
| if (clientEndpoint != null) { |
| clientEndpoint.setCommand(AsynchAEMessage.Process); |
| } |
| } |
| |
| private void handleCollectionProcessCompleteRequest(MessageContext aMessageContext) |
| throws AsynchAEException { |
| Endpoint replyToEndpoint = aMessageContext.getEndpoint(); |
| getController().collectionProcessComplete(replyToEndpoint); |
| } |
| |
| private void handleReleaseCASRequest(MessageContext aMessageContext) throws AsynchAEException { |
| String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference); |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), |
| "handleReleaseCASRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_release_cas_req__FINE", |
| new Object[] { getController().getName(), casReferenceId }); |
| } |
| getController().releaseNextCas(casReferenceId); |
| } |
| |
| private void handlePingRequest(MessageContext aMessageContext) { |
| try { |
| getController().getOutputChannel().sendReply(AsynchAEMessage.Ping, |
| aMessageContext.getEndpoint()); |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "handlePingRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| } |
| |
| } |
| |
| private void handleStopRequest(MessageContext aMessageContext) { |
| try { |
| String casReferenceId = aMessageContext |
| .getMessageStringProperty(AsynchAEMessage.CasReference); |
| System.out.println("###################Controller::" + getController().getComponentName() |
| + " Received <<<STOP>>> Request For CAS:" + casReferenceId); |
| if (getController() instanceof PrimitiveAnalysisEngineController) { |
| getController().addAbortedCasReferenceId(casReferenceId); |
| } else if (getController() instanceof AggregateAnalysisEngineController_impl) { |
| try { |
| CasStateEntry casStateEntry = getController().getLocalCache().lookupEntry(casReferenceId); |
| // Mark the CAS as if it have failed. In this case we dont associate any |
| // exceptions with this CAS so its really not a failure of a CAS or any |
| // of its children. We simply use the same logic here as if the CAS failed. |
| // The Aggregate replyToClient() method will know that this CAS was stopped |
| // as opposed to failed by the fact that the CAS has no exceptions associated |
| // with it. In such case the replyToClient() method returns an input CAS as if |
| // it has been fully processed. |
| casStateEntry.setFailed(); |
| ((AggregateAnalysisEngineController_impl) getController()).stopCasMultipliers(); |
| } catch (Exception ex) { |
| } // CAS may have already been deleted |
| |
| } |
| |
| } catch (Exception e) { |
| } |
| } |
| |
| /** |
| * Main method called by the predecessor handler. |
| * |
| * |
| */ |
| public void handle(Object anObjectToHandle) // throws AsynchAEException |
| { |
| try { |
| super.validate(anObjectToHandle); |
| |
| MessageContext messageContext = (MessageContext) anObjectToHandle; |
| if (isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.Process) |
| || isHandlerForMessage(messageContext, AsynchAEMessage.Request, |
| AsynchAEMessage.CollectionProcessComplete) |
| || isHandlerForMessage(messageContext, AsynchAEMessage.Request, |
| AsynchAEMessage.ReleaseCAS) |
| || isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.Stop)) { |
| int payload = messageContext.getMessageIntProperty(AsynchAEMessage.Payload); |
| int command = messageContext.getMessageIntProperty(AsynchAEMessage.Command); |
| |
| getController().getControllerLatch().waitUntilInitialized(); |
| |
| // If a Process Request, increment number of CASes processed |
| if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request |
| && command == AsynchAEMessage.Process |
| && !messageContext.propertyExists(AsynchAEMessage.CasSequence)) { |
| // Increment number of CASes processed by this service |
| getController().getServicePerformance().incrementNumberOfCASesProcessed(); |
| } |
| if (getController().isStopped()) { |
| return; |
| } |
| |
| if (AsynchAEMessage.CASRefID == payload) { |
| // Fetch id of the CAS from the message. |
| if (getCasReferenceId(messageContext) == null) { |
| return; // Invalid message. Nothing to do |
| } |
| |
| handleProcessRequestWithCASReference(messageContext); |
| } else if (AsynchAEMessage.XMIPayload == payload |
| || AsynchAEMessage.BinaryPayload == payload) { |
| // Fetch id of the CAS from the message. |
| if (getCasReferenceId(messageContext) == null) { |
| return; // Invalid message. Nothing to do |
| } |
| handleProcessRequestFromRemoteClient(messageContext); |
| } else if (AsynchAEMessage.XCASPayload == payload) { |
| // Fetch id of the CAS from the message. |
| if (getCasReferenceId(messageContext) == null) { |
| return; // Invalid message. Nothing to do |
| } |
| handleProcessRequestWithXCAS(messageContext); |
| } else if (AsynchAEMessage.None == payload |
| && AsynchAEMessage.CollectionProcessComplete == command) { |
| handleCollectionProcessCompleteRequest(messageContext); |
| } else if (AsynchAEMessage.None == payload && AsynchAEMessage.ReleaseCAS == command) { |
| handleReleaseCASRequest(messageContext); |
| } else if (AsynchAEMessage.None == payload && AsynchAEMessage.Stop == command) { |
| handleStopRequest(messageContext); |
| } else if (AsynchAEMessage.None == payload && AsynchAEMessage.Ping == command) { |
| handlePingRequest(messageContext); |
| } |
| // Handled Request |
| return; |
| } |
| // Not a Request nor Command. Delegate to the next handler in the chain |
| super.delegate(messageContext); |
| } catch (Exception e) { |
| if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { |
| UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), |
| "handle", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, |
| "UIMAEE_exception__WARNING", new Object[] { e }); |
| } |
| getController().getErrorHandlerChain().handle(e, |
| HandlerBase.populateErrorContext((MessageContext) anObjectToHandle), getController()); |
| } |
| } |
| |
| } |