blob: e6459274b021aca36006c9e0c3d8413578415ede [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.adapter.jms.client;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.jms.Destination;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.MessageTimeoutException;
import org.apache.uima.aae.error.UimaASPingTimeout;
import org.apache.uima.aae.error.UimaASProcessCasTimeout;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientState;
import org.apache.uima.cas.CAS;
import org.apache.uima.util.Level;
public class ClientServiceDelegate extends Delegate {
private static final Class CLASS_NAME = ClientServiceDelegate.class;
private BaseUIMAAsynchronousEngineCommon_impl clientUimaAsEngine;
private String applicationName = "UimaAsClient";
private volatile boolean usesSynchronousAPI;
private Destination freeCasDestination = null;
private Object errorMux = new Object();
private volatile boolean pingTimeout = false;
public ClientServiceDelegate(String serviceName, String anApplicationName,
BaseUIMAAsynchronousEngineCommon_impl engine) {
super.delegateKey = serviceName;
clientUimaAsEngine = engine;
if (anApplicationName != null && anApplicationName.trim().length() > 0) {
applicationName = anApplicationName;
}
}
public boolean isSynchronousAPI() {
return usesSynchronousAPI;
}
public boolean isPingTimeout() {
return pingTimeout;
}
public void resetPingTimeout() {
pingTimeout = false;
}
public void setSynchronousAPI() {
this.usesSynchronousAPI = true;
}
public Destination getFreeCasDestination() {
return freeCasDestination;
}
public void setFreeCasDestination(Destination freeCasDestination) {
this.freeCasDestination = freeCasDestination;
}
public String getComponentName() {
return applicationName;
}
public void handleError(Exception e, ErrorContext errorContext) {
String casReferenceId = null;
CAS cas = null;
ClientRequest cachedRequest = null;
casReferenceId = (String) errorContext.get(AsynchAEMessage.CasReference);
synchronized(errorMux) {
if (!clientUimaAsEngine.running) {
cancelDelegateTimer();
return;
}
int command = ((Integer) errorContext.get(AsynchAEMessage.Command)).intValue();
try {
if (e instanceof MessageTimeoutException) {
switch (command) {
case AsynchAEMessage.Process:
//casReferenceId = (String) errorContext.get(AsynchAEMessage.CasReference);
if (casReferenceId != null) {
cachedRequest = (ClientRequest) clientUimaAsEngine.clientCache.get(casReferenceId);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)
&& getEndpoint() != null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_process_timeout_WARNING",
new Object[] { getEndpoint().getEndpoint(), clientUimaAsEngine.getBrokerURI(), cachedRequest.getHostIpProcessingCAS() });
}
if (cachedRequest != null && cachedRequest.isRemote()) {
cas = cachedRequest.getCAS();
}
boolean isPingTimeout = false;
if (errorContext.containsKey(AsynchAEMessage.ErrorCause)) {
isPingTimeout = AsynchAEMessage.PingTimeout == (Integer) errorContext
.get(AsynchAEMessage.ErrorCause);
}
if (isPingTimeout && isAwaitingPingReply()) {
// reset only if the connection is valid
if ( clientUimaAsEngine.state != ClientState.RECONNECTING) {
resetAwaitingPingReply();
}
pingTimeout = true;
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_ping_timed_out__WARNING", new Object[] { getKey() });
clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(),
BaseUIMAAsynchronousEngineCommon_impl.ProcessTimeout, casReferenceId);
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_process_timeout__WARNING", new Object[] { super.getCasProcessTimeout() });
}
clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(),
BaseUIMAAsynchronousEngineCommon_impl.ProcessTimeout, casReferenceId);
}
}
clientUimaAsEngine.clientSideJmxStats.incrementProcessTimeoutErrorCount();
break;
case AsynchAEMessage.GetMeta:
if (isAwaitingPingReply()) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_ping_timed_out__WARNING", new Object[] { clientUimaAsEngine.getEndPointName(),getCasPendingDispatchListSize(),getCasPendingReplyListSize()});
}
super.resetAwaitingPingReply();
synchronized( super.pendingDispatchList ) {
// Fail all CASes in the PendingDispatch list
Iterator<Delegate.DelegateEntry> it = getDelegateCasesPendingDispatch().iterator();
while( clientUimaAsEngine.running && it.hasNext() ) {
DelegateEntry de = it.next();
cachedRequest = (ClientRequest) (clientUimaAsEngine.getCache()).get(de.getCasReferenceId());
if ( cachedRequest != null ) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_reject_by_forced_timeout__WARNING", new Object[] { de.getCasReferenceId(), String.valueOf(cachedRequest.getCAS().hashCode())});
}
//dumpDelayedList();
try {
clientUimaAsEngine.handleException(new UimaASProcessCasTimeout("Service Not Responding to Ping - CAS:"+de.getCasReferenceId(), new UimaASPingTimeout("Forced Timeout on CAS in PendingDispatch list. The CAS Has Not Been Dispatched since the Service Appears to be Unavailable")), de.getCasReferenceId(), null,cachedRequest, !cachedRequest.isSynchronousInvocation(), false);
} catch( Exception ex) {
ex.printStackTrace();
}
}
if ( clientUimaAsEngine.running ) {
it.remove();
}
}
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_meta_timeout_WARNING", new Object[] { getKey() });
}
// Notifies Listeners and removes ClientRequest instance from the client cache
clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(),
BaseUIMAAsynchronousEngineCommon_impl.MetadataTimeout, casReferenceId);
clientUimaAsEngine.clientSideJmxStats.incrementMetaTimeoutErrorCount();
}
break;
case AsynchAEMessage.CollectionProcessComplete:
break;
}
}
} catch (Exception ex) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", ex);
}
}
// Dont release the CAS if synchronous API was used
if (cas != null && !cachedRequest.isSynchronousInvocation()) {
cas.release();
}
}
}
public String enrichProcessCASTimeoutMessage(int aCommand, String casReferenceId, long timeToWait, String timeoutMessage) {
StringBuffer sb = new StringBuffer(timeoutMessage);
try {
if ( casReferenceId != null && clientUimaAsEngine.getCache().containsKey(casReferenceId) ) {
ClientRequest cr =
(ClientRequest)clientUimaAsEngine.getCache().get(casReferenceId);
if ( cr != null ) {
sb.append(". Process CAS on host: "+cr.getHostIpProcessingCAS()+" exceeded configured timeout threshold of "+timeToWait+" ms");
}
}
} catch( Exception e) {
e.printStackTrace();
}
return sb.toString();
}
}