blob: c990d2437c8ed7cde6eb45c37d611cdd34bbcd1f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.adapter.vinci;
import java.net.InetAddress;
import java.util.Properties;
import org.apache.uima.UIMAFramework;
import org.apache.uima.adapter.vinci.util.Constants;
import org.apache.uima.adapter.vinci.util.VinciSaxParser;
import org.apache.uima.analysis_engine.AnalysisEngineServiceStub;
import org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.analysis_engine.service.impl.AnalysisEngineServiceAdapter;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.admin.CASMgr;
import org.apache.uima.cas.impl.CASCompleteSerializer;
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.cas.impl.CASSerializer;
import org.apache.uima.cas.impl.Serialization;
import org.apache.uima.internal.util.SerializationUtils;
import org.apache.uima.resource.Parameter;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceServiceException;
import org.apache.uima.resource.metadata.ResourceMetaData;
import org.apache.uima.util.SaxDeserializer;
import org.apache.vinci.transport.VinciClient;
import org.apache.vinci.transport.VinciFrame;
import org.apache.vinci.transport.context.VinciContext;
import org.apache.vinci.transport.document.AFrame;
public class VinciBinaryAnalysisEngineServiceStub implements AnalysisEngineServiceStub {
private static final boolean debug = false;
private VinciClient mVinciClient;
private AnalysisEngineServiceAdapter mOwner;
/**
* Timeout to use for process and collectionProcessComplete calls.
*/
private int mTimeout;
/**
* Timeout to use for getMetaData calls.
*/
private int mGetMetaDataTimeout;
public VinciBinaryAnalysisEngineServiceStub(String endpointURI, AnalysisEngineServiceAdapter owner)
throws ResourceInitializationException {
this(endpointURI, null, owner, null);
}
public VinciBinaryAnalysisEngineServiceStub(String endpointURI, Integer timeout,
AnalysisEngineServiceAdapter owner, Parameter[] parameters)
throws ResourceInitializationException {
mOwner = owner;
// open Vinci connection
try {
VinciContext vctx = new VinciContext(InetAddress.getLocalHost().getCanonicalHostName(), 0);
// Override vinci default VNS settings
String vnsHost = null;
String vnsPort = null;
String getMetaDataTimeout = null;
if (parameters != null) {
vnsHost =
VinciBinaryAnalysisEngineServiceStub.getParameterValueFor("VNS_HOST", parameters);
vnsPort = VinciBinaryAnalysisEngineServiceStub.getParameterValueFor("VNS_PORT",
parameters);
getMetaDataTimeout = VinciBinaryAnalysisEngineServiceStub.getParameterValueFor("GetMetaDataTimeout", parameters);
}
if (vnsHost == null) {
vnsHost = System.getProperty("VNS_HOST");
if (vnsHost == null)
vnsHost = Constants.DEFAULT_VNS_HOST;
}
if (vnsPort == null) {
vnsPort = System.getProperty("VNS_PORT");
if (vnsPort == null)
vnsPort = "9000";
}
vctx.setVNSHost(vnsHost);
vctx.setVNSPort(Integer.parseInt(vnsPort));
// Override socket keepAlive setting
vctx.setSocketKeepAliveEnabled(isSocketKeepAliveEnabled());
if (debug) {
System.out.println("Establishing connnection to " + endpointURI + " using VNS_HOST:"
+ vctx.getVNSHost() + " and VNS_PORT=" + vctx.getVNSPort());
}
// establish connection to service
mVinciClient = new VinciClient(endpointURI, AFrame.getAFrameFactory(), vctx);
//store timeout for use in later RPC calls
if (timeout != null) {
mTimeout = timeout.intValue();
} else {
mTimeout = mVinciClient.getSocketTimeout(); //default
}
if (getMetaDataTimeout != null) {
mGetMetaDataTimeout = Integer.parseInt(getMetaDataTimeout);
} else {
mGetMetaDataTimeout = mVinciClient.getSocketTimeout(); //default
}
if (debug) {
System.out.println("Success");
}
} catch (Exception e) {
throw new ResourceInitializationException(e);
}
}
public static String getParameterValueFor(String aKey, Parameter[] parameters) {
if (aKey != null) {
for (int i = 0; parameters != null && i < parameters.length; i++) {
if (aKey.equals(parameters[i].getName())) {
return parameters[i].getValue();
}
}
}
return null; // aKey not found in parameters
}
/**
* @see org.apache.uima.resource.service.ResourceServiceStb#callGetMetaData()
*/
public ResourceMetaData callGetMetaData() throws ResourceServiceException {
try {
// create Vinci Frame
VinciFrame queryFrame = new VinciFrame();
// Add Vinci Command, so that the receiving service knows what to do
queryFrame.fadd("vinci:COMMAND", "GetMeta");
// Send the request to the service and wait for response
if (debug) {
System.out.println("Calling GetMeta");
}
mVinciClient.setTransportableFactory(AFrame.getAFrameFactory());
VinciFrame resultFrame = mVinciClient.rpc(queryFrame, mGetMetaDataTimeout);
if (debug) {
System.out.println("Success");
}
// Extract the data from Vinci Response frame
// System.out.println(resultFrame.toXML()); //DEBUG
// Remove things from the result frame that are not the MetaData objects we expect.
// In the future other things may go in here.
int i = 0;
while (i < resultFrame.getKeyValuePairCount()) {
String key = resultFrame.getKeyValuePair(i).getKey();
if (key.length() < 8 || !key.substring(key.length() - 8).equalsIgnoreCase("metadata")) {
resultFrame.fdrop(key);
} else {
i++;
}
}
// Parse the XML into the AnalysisEngineMetaData object
SaxDeserializer saxDeser = UIMAFramework.getXMLParser().newSaxDeserializer();
VinciSaxParser vinciSaxParser = new VinciSaxParser();
vinciSaxParser.setContentHandler(saxDeser);
vinciSaxParser.parse(resultFrame);
AnalysisEngineMetaData metadata = (AnalysisEngineMetaData) saxDeser.getObject();
return metadata;
} catch (Exception e) {
throw new ResourceServiceException(e);
}
}
/**
* @see org.apache.uima.analysis_engine.service.AnalysisEngineServiceStub#callProcess(CAS)
*/
public void callProcess(CAS aCAS) throws ResourceServiceException {
try {
AFrame requestFrame = new AFrame();
requestFrame.fset(Constants.VINCI_COMMAND, Constants.ANNOTATE);
// serialize CAS (including type system)
CASMgr cas = (CASMgr) aCAS;
CASCompleteSerializer serializer = Serialization.serializeCASComplete(cas);
requestFrame.fsetTrueBinary("BinaryCAS", SerializationUtils.serialize(serializer));
AFrame responseFrame = (AFrame) mVinciClient.sendAndReceive(requestFrame, mTimeout);
// deserialize CAS from response frame
byte[] responseCasBytes = responseFrame.fgetTrueBinary("BinaryCAS");
CASSerializer responseSerializer = (CASSerializer) SerializationUtils
.deserialize(responseCasBytes);
((CASImpl) cas).reinit(responseSerializer);
// also read annotation time and enter into AnalysisEngineManagementMBean
int annotationTime = responseFrame.fgetInt(Constants.ANNOTATION_TIME);
if (annotationTime > 0) {
AnalysisEngineManagementImpl mbean = (AnalysisEngineManagementImpl) mOwner
.getManagementInterface();
mbean.reportAnalysisTime(annotationTime);
}
} catch (Exception e) {
throw new ResourceServiceException(e);
}
}
/**
* @see org.apache.uima.analysis_engine.service.AnalysisEngineServiceStub#callBatchProcessComplete()
*/
public void callBatchProcessComplete() throws ResourceServiceException {
try {
// create Vinci Frame ( Data Cargo)
AFrame queryFrame = new AFrame();
// Add Vinci Command, so that the receiving service knows what to do
queryFrame.fadd("vinci:COMMAND", Constants.BATCH_PROCESS_COMPLETE);
mVinciClient.send(queryFrame); // oneway call
} catch (Exception e) {
throw new ResourceServiceException(e);
}
}
/**
* @see org.apache.uima.analysis_engine.service.AnalysisEngineServiceStub#callCollectionProcessComplete()
*/
public void callCollectionProcessComplete() throws ResourceServiceException {
try {
// create Vinci Frame ( Data Cargo)
AFrame queryFrame = new AFrame();
// Add Vinci Command, so that the receiving service knows what to do
queryFrame.fadd("vinci:COMMAND", Constants.COLLECTION_PROCESS_COMPLETE);
// make RPC call (return val ignored)
mVinciClient.rpc(queryFrame, mTimeout);
} catch (Exception e) {
throw new ResourceServiceException(e);
}
}
/**
* @see org.apache.uima.resource.service.impl.ResourceServiceStub#destroy()
*/
public void destroy() {
mVinciClient.close();
}
/**
* Gets whether socket keepAlive is enabled, by consulting the
* PerformanceTuningSettings. (If no setting specified, defaults
* to true.)
* @return if socketKeepAlive is enabled
*/
private boolean isSocketKeepAliveEnabled() {
Properties settings = mOwner.getPerformanceTuningSettings();
if (settings != null) {
String enabledStr = (String)settings.get(UIMAFramework.SOCKET_KEEPALIVE_ENABLED);
return !"false".equalsIgnoreCase(enabledStr);
}
return true;
}
}