blob: d8eb46fb91062c6dacb60077d5d4c7ed34469eaf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.aae.jms_adapter;
import java.util.HashMap;
import java.util.Map;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.client.UimaASStatusCallbackListener;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.analysis_engine.AnalysisEngineServiceStub;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.resource.Parameter;
import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.ResourceServiceException;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.resource.metadata.ResourceMetaData;
import org.apache.uima.util.Level;
public class JmsAnalysisEngineServiceStub extends UimaAsBaseCallbackListener implements
AnalysisEngineServiceStub {
private static final Class CLASS_NAME = JmsAnalysisEngineServiceStub.class;
public static final String PARAM_BROKER_URL = "brokerUrl";
public static final String PARAM_ENDPOINT = "endpoint";
public static final String PARAM_TIMEOUT = "timeout";
public static final String PARAM_GETMETA_TIMEOUT = "getmetatimeout";
public static final String PARAM_CPC_TIMEOUT = "cpctimeout";
public static final String PARAM_BIN_SERIALIZTION = "binary_serialization";
private Object mux = new Object();
private boolean cpcReceived;
private UimaAsynchronousEngine uimaEEEngine;
public JmsAnalysisEngineServiceStub(Resource owner, Parameter[] parameters)
throws ResourceInitializationException {
// read parameters
String brokerUrl = null;
String endpoint = null;
int timeout = 0;
int getMetaTimeout = 0;
int cpcTimeout = 0;
String binary_serialization = null;
for (int i = 0; i < parameters.length; i++) {
if (PARAM_BROKER_URL.equalsIgnoreCase(parameters[i].getName())) {
brokerUrl = parameters[i].getValue();
} else if (PARAM_ENDPOINT.equalsIgnoreCase(parameters[i].getName())) {
endpoint = parameters[i].getValue();
} else if (PARAM_TIMEOUT.equalsIgnoreCase(parameters[i].getName())) {
timeout = Integer.parseInt(parameters[i].getValue());
} else if (PARAM_BIN_SERIALIZTION.equalsIgnoreCase(parameters[i].getName())) {
binary_serialization = parameters[i].getValue();
} else if (PARAM_GETMETA_TIMEOUT.equalsIgnoreCase(parameters[i].getName())) {
getMetaTimeout = Integer.parseInt(parameters[i].getValue());
} else if (PARAM_CPC_TIMEOUT.equalsIgnoreCase(parameters[i].getName())) {
cpcTimeout = Integer.parseInt(parameters[i].getValue());
}
}
// initialize UIMA EE Engine
Map appCtxt = new HashMap();
appCtxt.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
appCtxt.put(UimaAsynchronousEngine.Endpoint, endpoint);
appCtxt.put(UimaAsynchronousEngine.CasPoolSize, 0);
if (timeout > 0) {
System.out.println("Setting Process Timeout: " + timeout);
appCtxt.put(UimaAsynchronousEngine.Timeout, timeout);
}
if (getMetaTimeout > 0) {
System.out.println("Setting GetMeta Timeout: " + getMetaTimeout);
appCtxt.put(UimaAsynchronousEngine.GetMetaTimeout, getMetaTimeout);
}
if (cpcTimeout > 0) {
System.out.println("Setting CPC Timeout: " + cpcTimeout);
appCtxt.put(UimaAsynchronousEngine.CpcTimeout, cpcTimeout);
}
if (binary_serialization != null && binary_serialization.equalsIgnoreCase("true")) {
System.out.println("Using binary serialization");
appCtxt.put(UimaAsynchronousEngine.SerializationStrategy, "binary");
}
uimaEEEngine = new BaseUIMAAsynchronousEngine_impl();
uimaEEEngine.addStatusCallbackListener(this);
uimaEEEngine.initialize(appCtxt);
System.out.println("adapter init complete");
}
/**
* @see org.apache.uima.resource.service.ResourceServiceStub#callGetMetaData()
*/
public ResourceMetaData callGetMetaData() throws ResourceServiceException {
// metadata already retrieved during initialization
try {
//return uimaEEEngine.getMetaData();
ResourceMetaData rmd = uimaEEEngine.getMetaData();
if ( rmd != null ) {
((ProcessingResourceMetaData)rmd).getOperationalProperties().setMultipleDeploymentAllowed(true);
return rmd;
}
} catch (ResourceInitializationException e) {
throw new ResourceServiceException(e);
}
throw new ResourceServiceException(new Exception("Uima AS getMetaData() call failed."));
}
/**
* @see org.apache.uima.analysis_engine.service.AnalysisEngineServiceStub#callGetAnalysisEngineMetaData()
*/
public AnalysisEngineMetaData callGetAnalysisEngineMetaData() throws ResourceServiceException {
return (AnalysisEngineMetaData) callGetMetaData();
}
/**
* @see org.apache.uima.analysis_engine.service.AnalysisEngineServiceStub#callProcess(CAS)
*/
public void callProcess(CAS aCAS) throws ResourceServiceException {
try {
uimaEEEngine.sendAndReceiveCAS(aCAS);
} catch (ResourceProcessException e) {
throw new ResourceServiceException(e);
}
}
/**
* @see CasObjectProcessorServiceStub#callProcessCas(CAS)
*/
public void callProcessCas(CAS aCAS) throws ResourceServiceException {
callProcess(aCAS);
}
/**
* @see org.apache.uima.resource.service.impl.ResourceServiceStub#destroy()
*/
public void destroy() {
try {
// System.out.println("destroy methjdssdx");
uimaEEEngine.stop();
} catch (Exception e) {
if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
UIMAFramework.getLogger().log(Level.WARNING, e.getMessage(), e);
}
}
}
/**
* @see org.apache.uima.collection.impl.service.CasObjectProcessorServiceStub#callBatchProcessComplete()
*/
public void callBatchProcessComplete() throws ResourceServiceException {
// Not supported. Do nothing, rather than throw an exception, since this is called
// in the normal course of CPE processing.
}
/**
* @see org.apache.uima.collection.impl.service.CasObjectProcessorServiceStub#callCollectionProcessComplete()
*/
public void callCollectionProcessComplete() throws ResourceServiceException {
try {
cpcReceived = false;
uimaEEEngine.collectionProcessingComplete();
// make this routine synchronous
// System.out.println("CPC no wakeup needed");
synchronized (mux) {
while (!cpcReceived) {
try {
mux.wait();
// System.out.println("CPC wakeup");
} catch (InterruptedException e) {
// Only here if something interrupts this thread
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"callCollectionProcessComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
new Object[] { Thread.currentThread().getName(), e });
}
}
}
}
} catch (ResourceProcessException e) {
throw new ResourceServiceException(e);
}
}
/*
* (non-Javadoc)
*
* @see
* org.apache.uima.aae.client.UimaASStatusCallbackListener#collectionProcessComplete(org.apache
* .uima.collection.EntityProcessStatus)
*/
public void collectionProcessComplete(EntityProcessStatus aStatus) {
synchronized (mux) {
// System.out.println("CPC reply done got one");
cpcReceived = true;
mux.notifyAll();
}
}
/*
* (non-Javadoc)
*
* @see
* org.apache.uima.aae.client.UimaASStatusCallbackListener#entityProcessComplete(org.apache.uima
* .cas.CAS, org.apache.uima.collection.EntityProcessStatus)
*/
public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus) {
// not used
}
/*
* (non-Javadoc)
*
* @see
* org.apache.uima.aae.client.UimaASStatusCallbackListener#initializationComplete(org.apache.uima
* .collection.EntityProcessStatus)
*/
public void initializationComplete(EntityProcessStatus aStatus) {
// not used
}
}