blob: 581411387906a5b48b5c680ab456b12a8341073e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.camel;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.uima.aae.client.UimaASProcessStatusImpl;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.jms.error.handler.BrokerConnectionException;
import org.apache.uima.resource.ResourceInitializationException;
/**
* The <code>UimaAsProducer</code> sends the body of a message to a UIMA AS processing
* pipeline.
*
* Configuration URI:
* uimadriver:authority?queue="nameofqueue"
*
* For example:
* uimadriver:tcp://localhost:61616?queue=TextAnalysisQueue
*/
@SuppressWarnings("rawtypes")
public class UimaAsProducer extends DefaultProducer implements AsyncProcessor {
private static final Log LOG = LogFactory.getLog(UimaAsProducer.class);
private static class ExchangeAsyncCallbackPair {
Exchange exchange;
AsyncCallback callback;
}
private static class UimaStatusCallbackListener extends UimaAsBaseCallbackListener {
/**
* Shared intermediate map instance. Its used to map the cas reference id
* to the input <code>Exchange</code>.
*/
private final Map<String, ExchangeAsyncCallbackPair> intermediateMap;
private UimaStatusCallbackListener(Map<String, ExchangeAsyncCallbackPair> intermediateMap) {
this.intermediateMap = intermediateMap;
}
public void initializationComplete(EntityProcessStatus aStatus) {
if (aStatus != null && aStatus.isException()) {
LOG.warn("Error on initializing: " + aStatus.getStatusMessage());
}
}
public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus) {
UimaASProcessStatusImpl statusImpl = (UimaASProcessStatusImpl) aStatus;
String referenceId = statusImpl.getCasReferenceId();
ExchangeAsyncCallbackPair exchangeCallbackPair;
synchronized (intermediateMap) {
exchangeCallbackPair = intermediateMap.remove(referenceId);
}
if (exchangeCallbackPair != null) {
if (aStatus.isException()) {
for (Exception e : aStatus.getExceptions()) {
LOG.warn(e);
}
exchangeCallbackPair.exchange.setException(new Exception(aStatus.getStatusMessage()));
}
exchangeCallbackPair.callback.done(false);
} else {
// If the connection to the broker is lost all outstanding CASes
// will not come back. All outstanding camel messages should
// be reported as failed.
if (aStatus.isException()) {
for (Exception exception : aStatus.getExceptions()) {
if (exception instanceof BrokerConnectionException) {
LOG.warn("Connection to broker lost, report all outstanding messages " +
"as failed!");
for (ExchangeAsyncCallbackPair callback :
intermediateMap.values()) {
callback.exchange.setException(exception);
callback.callback.done(false);
}
return;
}
}
}
LOG.warn("Could not find callback for CAS id: " + referenceId);
}
}
public void collectionProcessComplete(EntityProcessStatus aStatus) {
if (aStatus != null && aStatus.isException()) {
LOG.warn("Error on collection process complete: " + aStatus.getStatusMessage());
}
}
}
private UimaAsynchronousEngine uimaAsEngine;
/**
* The intermediate map keeps all {@link Exchange}s and their callbacks until asynchronous
* processing is finished.
*/
private final Map<String, ExchangeAsyncCallbackPair> intermediateMap;
@SuppressWarnings("unchecked")
public UimaAsProducer(String brokerAddress, String queue, Integer casPoolSize, Integer timeout, Endpoint endpoint)
throws Exception {
super(endpoint);
intermediateMap = new HashMap<String, ExchangeAsyncCallbackPair>();
uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
uimaAsEngine.addStatusCallbackListener(new UimaStatusCallbackListener(intermediateMap));
Map<String, Object> appCtx = new HashMap<String, Object>();
appCtx.put(UimaAsynchronousEngine.ServerUri, brokerAddress);
appCtx.put(UimaAsynchronousEngine.ENDPOINT, queue);
if (casPoolSize != null)
appCtx.put(UimaAsynchronousEngine.CasPoolSize, casPoolSize.intValue());
if (timeout != null)
appCtx.put(UimaAsynchronousEngine.Timeout, timeout.intValue());
try {
uimaAsEngine.initialize(appCtx);
} catch (ResourceInitializationException e) {
throw e;
}
}
/**
* Not implemented, since the producer implements the AsyncProcessor interface.
*/
public void process(Exchange exchange) throws Exception {
}
public boolean process(Exchange exchange, AsyncCallback callback) {
String rowId = exchange.getIn().getBody(String.class);
String refernceId;
try {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText(rowId);
// The intermediate map must be locked for sending
// a CAS to the service to guarantee that the reference
// id for the CAS is inserted into the intermediate map
// before the call back listener tries to map
// the just returned reference id to an exchange
synchronized (intermediateMap) {
refernceId = uimaAsEngine.sendCAS(cas);
ExchangeAsyncCallbackPair exchangeCallback = new ExchangeAsyncCallbackPair();
exchangeCallback.exchange = exchange;
exchangeCallback.callback = callback;
intermediateMap.put(refernceId, exchangeCallback);
}
} catch (Exception e) {
LOG.warn("Failed to send CAS", e);
// Processing of the exchange failed
// The error message is set on the exchange
exchange.setException(e);
// and the method returns synchronously with true
callback.done(true);
return true;
}
// Processing will be completed asynchronously
return false;
}
}