blob: 8bef4718e419456e64dcb98733884096f13cc177 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.collection.impl.cpm.container.deployer.vinci;
import java.net.ConnectException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import javax.naming.TimeLimitExceededException;
import org.apache.uima.UIMAFramework;
import org.apache.uima.collection.base_cpm.AbortCPMException;
import org.apache.uima.collection.base_cpm.CasProcessor;
import org.apache.uima.collection.impl.base_cpm.container.CasProcessorConfiguration;
import org.apache.uima.collection.impl.base_cpm.container.KillPipelineException;
import org.apache.uima.collection.impl.base_cpm.container.ProcessingContainer;
import org.apache.uima.collection.impl.base_cpm.container.ServiceConnectionException;
import org.apache.uima.collection.impl.base_cpm.container.deployer.CasProcessorDeployer;
import org.apache.uima.collection.impl.base_cpm.container.deployer.CasProcessorDeploymentException;
import org.apache.uima.collection.impl.cpm.Constants;
import org.apache.uima.collection.impl.cpm.container.CPEFactory;
import org.apache.uima.collection.impl.cpm.container.CasProcessorConfigurationJAXBImpl;
import org.apache.uima.collection.impl.cpm.container.NetworkCasProcessorImpl;
import org.apache.uima.collection.impl.cpm.container.ProcessingContainer_Impl;
import org.apache.uima.collection.impl.cpm.container.ServiceProxyPool;
import org.apache.uima.collection.impl.cpm.container.deployer.VinciTAP;
import org.apache.uima.collection.impl.cpm.container.deployer.vns.LocalVNS;
import org.apache.uima.collection.impl.cpm.container.deployer.vns.VNSQuery;
import org.apache.uima.collection.impl.cpm.container.deployer.vns.VinciServiceInfo;
import org.apache.uima.collection.impl.cpm.engine.BoundedWorkQueue;
import org.apache.uima.collection.impl.cpm.engine.CPMEngine;
import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage;
import org.apache.uima.collection.impl.cpm.utils.Execute;
import org.apache.uima.collection.metadata.CpeCasProcessor;
import org.apache.uima.resource.Parameter;
import org.apache.uima.resource.ResourceConfigurationException;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resource.URISpecifier;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.util.Level;
import org.apache.uima.util.XMLInputSource;
import org.apache.vinci.transport.ServiceDownException;
import org.apache.vinci.transport.ServiceException;
/**
* Reference implementation of
* {@link org.apache.uima.collection.impl.base_cpm.container.deployer.CasProcessorDeployer} This
* component enables the CPE to deploy Cas Processors running as a Vinci service. Two deployment
* models are supported in the current implementation:
* <ul>
* <li>managed deployment (aka local)</li>
* <li>unmanaged deployment (aka remote)</li>
* </ul>
* Managed deployment gives the CPE control over the life cycle of the Cas Processor. The CPE
* starts, restarts and shuts down the Cas Processor running as a Vinci service. This service is
* launched on the same machine as the CPE but in a seperate process.
*
* Unmanaged deployment does not provide the CPE control over the Cas Processor that may be running
* on a remote machine. The CPE assumes that such Cas Processor is managed by a separate application
* and is always available.
*
* For the managed deployment the CPE uses its internal VNS (Vinci Naming Service) to which Cas
* Processor must connect to. The VNS issues a port for the Cas Processor to run on and creates a
* proxy to it.
*
* For unmanaged Cas Processor the CPE creates a proxy to remote service. The remote Cas Processor
* service is discovered with help of VNS running on a host and port defined in the Cpe descriptor
* for this Cas Processor.
*
*/
public class VinciCasProcessorDeployer implements CasProcessorDeployer {
/** The Constant LOCAL_VNS. */
public static final String LOCAL_VNS = "LOCAL_VNS";
/** The Constant WAIT_TIME. */
public static final int WAIT_TIME = 150;
/** The Constant MAX_WAIT_TRIES. */
public static final int MAX_WAIT_TRIES = 5000;
/** The Constant CONN_RETRY_COUNT. */
public static final String CONN_RETRY_COUNT = "CONN_RETRY_COUNT";
/** The Constant DEFAULT_VNS_PORT. */
public static final String DEFAULT_VNS_PORT = "9005";
// Define the range for ports. In terms of the first port
// and the range. So the two constants below say that the
/** The Constant DEFAULT_SERVICE_PORT. */
// first port is 10000 and the range is from 10000 to 13000.
public static final int DEFAULT_SERVICE_PORT = 10000;
/** The Constant DEFAULT_SERVICE_PORT_RANGE. */
public static final int DEFAULT_SERVICE_PORT_RANGE = 3000;
/** The Constant SLEEP_TIME. */
public static final int SLEEP_TIME = 1000;
/** The cas processor pool. */
private ServiceProxyPool casProcessorPool = null;
/** The local VNS thread. */
// Thread for the local VNS
private Thread localVNSThread = null;
// Local VNS is a shared instance across all instances of VinciCasProcessorDeployer. It uses
// shared
/** The vns. */
// portQueue (defined below). VNS and the queue are instantiated once.
private static volatile LocalVNS vns = null;
/** The restart count. */
private int restartCount = 0;
// Pool for port numbers assigned to Cas Processors running as vinci services deployed as managed
/** The port queue. */
// Cas Processors
private static BoundedWorkQueue portQueue = null;
/** The cpe factory. */
private CPEFactory cpeFactory = null;
/** The monitor. */
private final Object monitor = new Object(); // must be unique object, used for synch
/** The current service list. */
private ArrayList currentServiceList = null;
/**
* Instantiaes the class and gives it access to CPE configuration.
*
* @param aCpeFactory the a cpe factory
*/
public VinciCasProcessorDeployer(CPEFactory aCpeFactory) {
cpeFactory = aCpeFactory;
}
/**
* Deploys integrated Cas Processor. Number of instances this routine actually deploys depends on
* number of processing threads defined in the CPE descriptor. There is one instance per
* processing thread created here. The <i>aCasProcessorList</i> contains instantiated Cas
* Processors. These are instantiated by the CPEFactory.
*
* @param aCasProcessorList - list containing instantiated Cas Processors
* @param aEngine the a engine
* @param redeploy - true when redeploying failed Cas Processor
* @return - ProcessingContainer containing pool of CasProcessors
* @throws ResourceConfigurationException the resource configuration exception
*/
public ProcessingContainer deployCasProcessor(List aCasProcessorList, CPMEngine aEngine,
boolean redeploy) throws ResourceConfigurationException {
return deployCasProcessor(aCasProcessorList, redeploy);
}
/**
* Deploys CasProcessor using configuration from provided container. This method is used for
* re-launching failed Cas Processor.
*
* @param aProcessingContainer -
* container for deployed CasProcessor.
* @throws ResourceConfigurationException the resource configuration exception
*/
@Override
public void deployCasProcessor(ProcessingContainer aProcessingContainer)
throws ResourceConfigurationException {
try {
CasProcessorConfiguration casProcessorConfig = aProcessingContainer
.getCasProcessorConfiguration();
String name = casProcessorConfig.getName();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_reeploying_service__FINEST",
new Object[] { Thread.currentThread().getName(), name });
}
// re-deploy failed Cas Processor according to mode defined in the cpe descriptor. true -
// stands for re-launch or restart
deployBasedOnModel(aProcessingContainer, casProcessorConfig, true);
} catch (ResourceConfigurationException e) {
throw e;
} catch (Exception e) {
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_unable_to_deploy_service__SEVERE", new Object[] {
Thread.currentThread().getName(),
aProcessingContainer.getCasProcessorConfiguration().getName() });
}
}
/**
* Deploys CasProcessors in a provided List. The List contains instances of Cas Processors that
* are not yet bound to a vinci service. To do anything usefull, the Cas Processor must be
* deployed first. The process of deploying a proxy depends on the deployment mode defined in the
* cpe descriptor. In case of managed Cas Processor, the deployment consists of launching the
* vinci service and creating a connection to it. For un-managed Cas Processor the CPE establishes
* the connection.
*
* @param aCasProcessorList -
* list of CasProcessors to deploy
* @param redeploy -
* true if intent is to redeploy failed service
* @return ProcessinContainer - instance of Container
* @throws ResourceConfigurationException the resource configuration exception
*/
@Override
public ProcessingContainer deployCasProcessor(List aCasProcessorList, boolean redeploy)
throws ResourceConfigurationException {
String name = null;
CasProcessor casProcessor = null;
CasProcessorConfiguration casProcessorConfig = null;
ProcessingContainer processingContainer = null;
try {
// Deploy one Cas Processor at a time in sequential order
for (int i = 0; i < aCasProcessorList.size(); i++) {
casProcessor = (CasProcessor) aCasProcessorList.get(i);
// Container may have already been instantiated. This will be the case if the CPM is
// configured for concurrent
// processing ( more than one processing pipeline). There is only one container per
// CasProcessor type.
// So each instance of the same CasProcessor will be associated with a single container.
// Inside the
// container instances are pooled. When deploying the very first CasProcessor of each type,
// the
// container will be created and initialized. Any subsequent deployments of this
// CasProcessor will
// simply use it, and will be added to this container's instance pool.
if (processingContainer == null) {
ProcessingResourceMetaData metaData = casProcessor.getProcessingResourceMetaData();
CpeCasProcessor casProcessorType = (CpeCasProcessor) cpeFactory.casProcessorConfigMap
.get(metaData.getName());
// Create a pool to hold instances of CasProcessors. Instances are managed by a container
// through
// getCasProcessor() and releaseProcessor() methods.
casProcessorPool = new ServiceProxyPool();
// Create CasProcess Configuration holding info defined in the CPE descriptor
casProcessorConfig = new CasProcessorConfigurationJAXBImpl(casProcessorType, cpeFactory.getResourceManager());
// Associate CasProcessor configuration from CPE descriptor with this container
processingContainer = new ProcessingContainer_Impl(casProcessorConfig, metaData,
casProcessorPool);
processingContainer.setCasProcessorDeployer(this);
// Instantiate an object that encapsulates CasProcessor configuration
// Determine deployment model for this CasProcessor
// Each CasProcessor must have a name
name = casProcessorConfig.getName();
if (name == null) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE,
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_unable_to_read_meta__SEVERE", Thread.currentThread().getName());
}
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_casprocessor_no_name_found__SEVERE", new Object[] { Thread
.currentThread().getName() });
}
}
// Add CasProcess to the instance pool
casProcessorPool.addCasProcessor(casProcessor);
}
// There is one instance of ProcessingContainer for set of CasProcessors
if (processingContainer == null) {
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_invalid_container__SEVERE", new Object[] { Thread.currentThread()
.getName() });
}
// Assumption is that the container already exists and it contains CasProcessor configuration
casProcessorConfig = processingContainer.getCasProcessorConfiguration();
if (casProcessorConfig == null) {
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_container_configuration_not_found__SEVERE", new Object[] { Thread
.currentThread().getName() });
}
deployBasedOnModel(processingContainer, casProcessorConfig, false);
} catch (ResourceConfigurationException e) {
throw e;
} catch (Exception e) {
throw new ResourceConfigurationException(e);
}
return processingContainer;
}
/**
* Deploys CasProcessor according to configured deployment mode.
*
* @param aProcessingContainer -
* container managing instances of CasProcessor
* @param aCasProcessorConfig -
* CasProcessor configuration
* @param redeploy -
* flag indicating if this re-deployment of CasProcessor that failed
*
* @throws ResourceConfigurationException if unknown deployment type
*/
private void deployBasedOnModel(ProcessingContainer aProcessingContainer,
CasProcessorConfiguration aCasProcessorConfig, boolean redeploy)
throws ResourceConfigurationException {
String deployModel = aCasProcessorConfig.getDeploymentType();
String name = aCasProcessorConfig.getName();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_deploy_with_mode__FINEST",
new Object[] { Thread.currentThread().getName(), name, deployModel });
}
// Deploy CasProcessor as configured in the CPE descriptor
if (Constants.DEPLOYMENT_LOCAL.equals(deployModel)) {
// Deploy CasProcessor locally, meaning same machine as the CPM but different JVM
deployLocal(aProcessingContainer, redeploy);
} else if (Constants.DEPLOYMENT_REMOTE.equals(deployModel)) {
// Deploy CasProcessor remotely, meaning instantiate and connect a proxy to existing/
// running service
deployRemote(aProcessingContainer, redeploy);
} else if (Constants.DEPLOYMENT_INTEGRATED.equals(deployModel)) {
// Deploy CasProcessor as a collocated instance with CPM
deployIntegrated(aProcessingContainer, redeploy);
} else {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_unsupported_deploy_mode__SEVERE",
new Object[] { Thread.currentThread().getName(), name, deployModel });
}
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_unsupported_deploy_mode__SEVERE", new Object[] {
Thread.currentThread().getName(), name, deployModel });
}
}
/**
* Deploys CasProcessor as a Local Vinci Service. The CPE will launch the Cas Processor in a
* separate process but on the same machine. The CPE will manage the life-cycle of the Cas
* Processor including shutdown. Shutdown command will be sent when the CPE completes processing.
* In this deployment mode the CPE uses its own VNS. The VNS information (host and port) will be
* added as command line arguments to the program being launched (Cas Processor application). It
* is the responsibility of the Cas Processor to to read this information and use it to locate VNS
* the CPE is managing.
*
* @param aProcessingContainer -
* container object that will hold proxies to Cas Processor when it is launched
* @param redeploy -
* true if intent is to redeploy failed service
* @throws ResourceConfigurationException if the descriptor is invalid or for any internal Exception (wrapped)
*/
private void deployLocal(ProcessingContainer aProcessingContainer, boolean redeploy)
throws ResourceConfigurationException {
// Cas Processor configuration is associated with the container. There is a container per Cas
// Processor type.
// All Cas Processors that are replicated are manages by a single container. The configuration
// is shared accross
// all instances of Cas Processors. To the CPE each instance is exactly the same.
CasProcessorConfiguration casProcessorConfig = aProcessingContainer
.getCasProcessorConfiguration();
String name = casProcessorConfig.getName();
// If CasProcessor defined to run in a seperate process, extract all runtime info as
// defined in the cpe descriptor for this casProcessor
try {
if (casProcessorConfig.runInSeparateProcess()) {
// redeploy = true IF the service failed and intent is to re-launch it
if (redeploy == false) {
// Get number of instances of the Cas Processors to launch. This is defined in the CPE
// descriptor by
// attribute processingThreadCount in the <casProcessors> element.
int concurrentThreadCount = cpeFactory.getProcessingUnitThreadCount();
// 08/16/04 commented out to facilitate sharing of single queue across multiple instances
// of this class.
// LocalVNS is also shared.
// For local Vinci services we must have a local vns service. This component determines
// what ports
// services will run on
if (vns == null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_deploy_vns__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// 08/16/04 added to facilitate sharing of single queue across multiple instances of
// this class.
portQueue = new BoundedWorkQueue(concurrentThreadCount, "Port Queue", null);
// Starts the VNS in its own thread.
deployVNS(casProcessorConfig, redeploy);
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_deploy_with_mode__FINEST",
new Object[] { Thread.currentThread().getName(), casProcessorConfig.getName(),
casProcessorConfig.getDeploymentType() });
}
// Do the actual deployment of the application. Actually, the deployer will deploy as
// many application instances as there are processing pipeline threads.
launchLocalService(aProcessingContainer, casProcessorConfig, redeploy,
concurrentThreadCount);
} else {
// Re-deploy one instance of the service
launchLocalService(aProcessingContainer, casProcessorConfig, redeploy, 1);
}
associateMetadataWithContainer(aProcessingContainer);
} else {
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_invalid_cpe_descriptor__SEVERE", new Object[] {
Thread.currentThread().getName(), name, "runInSeparateProcess" });
}
} catch (Exception e) {
throw new ResourceConfigurationException(e);
}
}
/**
* Retrieve the metadata from the service and add it to the container.
*
* @param aProcessingContainer -
* container where the metadata will be saved
*/
private void associateMetadataWithContainer(ProcessingContainer aProcessingContainer) {
CasProcessor processor = null;
try {
if (casProcessorPool != null) {
processor = casProcessorPool.checkOut();
if (processor != null) {
ProcessingResourceMetaData metadata = processor.getProcessingResourceMetaData();
if (aProcessingContainer != null && metadata != null) {
aProcessingContainer.setMetadata(metadata);
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (processor != null) {
casProcessorPool.checkIn(processor);
}
}
}
/**
* Launches an application as a seperate process using java's Process object. Actually, it
* launches as many copies of the application as given in the <i>howMany</i> parameter.
*
* @param aProcessingContainer the a processing container
* @param casProcessorConfig -
* Configuration containing start up command
* @param redeploy -
* true if intent is to redeploy failed application
* @param howMany -
* how many seperate process to spawn
* @throws CasProcessorDeploymentException wraps any exception
*/
private void launchLocalService(ProcessingContainer aProcessingContainer,
CasProcessorConfiguration casProcessorConfig, boolean redeploy, int howMany)
throws CasProcessorDeploymentException {
try {
Execute exec = casProcessorConfig.getExecSpec();
String cmd[] = exec.getCmdLine();
String[] execCommand = new String[3];
if (System.getProperty("os.name").equalsIgnoreCase("linux")) {
execCommand[0] = "/bin/sh";
execCommand[1] = "-c";
}
StringBuffer sb = new StringBuffer();
// The command line has been established, now make sure that we tell the Cas Processor about
// the
// VNS port in use. In this logic the VNS is querried for its port number and this will the
// port
// provided to the Cas Processor. The Cas Processor must read this argument during its start
// up
// to ensure connection to the correct VNS.
for (int i = 0; i < cmd.length; i++) {
if (cmd[i] != null && cmd[i].indexOf("-DVNS_PORT") > -1) {
int vnsPort = vns.getVNSPort();
cmd[i] = "-DVNS_PORT=" + vnsPort;
}
sb.append(" " + cmd[i]);
}
String logDir = "";
if (System.getProperty("LOG_DIR") != null) {
logDir = System.getProperty("LOG_DIR");
if (!logDir.endsWith("/") && !logDir.endsWith("\\")) {
// Append OS specific seperator
logDir += System.getProperty("file.separator");
}
}
sb.append(" >> " + logDir + "\"" + casProcessorConfig.getName() + System.currentTimeMillis()
+ ".log\"" + " 2>&1");
execCommand[2] = sb.toString();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_vns_started__FINEST",
new Object[] { Thread.currentThread().getName(), casProcessorConfig.getName() });
}
int serviceCount = howMany;
// Spawn off as many processes as defined in CPE descriptor. The exact number of instances is
// equal to
// procesingUnitThreadCount attribute.
while (serviceCount-- > 0) {
for (int i = 0; cmd != null && i < cmd.length; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_launching_with_service_cmd__FINEST",
new Object[] { Thread.currentThread().getName(), casProcessorConfig.getName(),
String.valueOf(i), cmd[i] });
}
}
String[] env;
if ((env = exec.getEnvironment()) != null && env.length > 0) {
for (int i = 0; i < env.length; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_launching_with_service_env__FINEST",
new Object[] { Thread.currentThread().getName(),
casProcessorConfig.getName(), String.valueOf(i), env[i] });
}
}
if (System.getProperty("os.name").equalsIgnoreCase("linux")) {
for (int i = 0; execCommand != null && i < execCommand.length; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_launching_with_service_exec__FINEST",
new Object[] { Thread.currentThread().getName(),
casProcessorConfig.getName(), String.valueOf(i), execCommand[i] });
}
}
Runtime.getRuntime().exec(execCommand, env);
} else {
Runtime.getRuntime().exec(cmd, env);
}
} else {
if (System.getProperty("os.name").equalsIgnoreCase("linux")) {
for (int i = 0; execCommand != null && i < execCommand.length; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_launching_with_service_exec__FINEST",
new Object[] { Thread.currentThread().getName(),
casProcessorConfig.getName(), String.valueOf(i), execCommand[i] });
}
}
Runtime.getRuntime().exec(execCommand);
} else {
Runtime.getRuntime().exec(cmd);
}
}
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_connecting_to_services__FINEST",
new Object[] { Thread.currentThread().getName(), casProcessorConfig.getName() });
}
// Services has been started so now connect proxies to them.
connectToServices(aProcessingContainer, casProcessorConfig, redeploy, howMany);
} catch (ConnectException e) {
e.printStackTrace();
throw new CasProcessorDeploymentException(e);
} catch (Exception e) {
e.printStackTrace();
throw new CasProcessorDeploymentException(e);
}
}
/**
* No-op for integrated deployment. Integrated CasProcessors are instantiated using UIMA framework
* factory in the CPEFactory.
*
* @param aProcessingContainer the a processing container
* @param redeploy the redeploy
* @throws ResourceConfigurationException tbd
*/
private void deployIntegrated(ProcessingContainer aProcessingContainer, boolean redeploy)
throws ResourceConfigurationException {
// Integrated CasProcessors are deployed in the CPEFactory
}
/**
* Deploys CasProcessor as a remote/network service. In this case, the CPE does not manage
* life-cycle of the Cas Processor. The CPE in this case simply creates a proxy to the remote Cas
* Processor via provided VNS. The exact VNS used here is defined on per Cas Processor in the cpe
* descriptor.
*
* @param aProcessingContainer -
* container that will manage instances of the CasProcessor
* @param redeploy -
* flag indicating if this redeployment of failed CasProcessor
* @throws ResourceConfigurationException wraps exception
*/
private void deployRemote(ProcessingContainer aProcessingContainer, boolean redeploy)
throws ResourceConfigurationException {
CasProcessorConfiguration casProcessorConfig = aProcessingContainer
.getCasProcessorConfiguration();
String name = casProcessorConfig.getName();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_deploy_with_mode__FINEST",
new Object[] { Thread.currentThread().getName(), name,
casProcessorConfig.getDeploymentType() });
}
try {
// Remote CasProcessors are accessed by service name lookup in VNS
String serviceUri = getServiceUri(casProcessorConfig);
// How many proxies to create to remote service
int concurrentThreadCount = cpeFactory.getProcessingUnitThreadCount();
// Currently the CPM tries as many connections to the external annotator service as there are
// threads.
// #########################################################################################
// # This code adjusts number of proxies in the CasProcessor Pool. This code removes those
// # Cas Processors from the pool that have no assigned proxy. This may happen if there are
// # not enough fenced services started and CPE configuration requires exclusive access to
// # a service. The CPM will adjust number of processing pipelines to the lowest instance
// # count found in all Cas Processor pools. For example, if the CPE descriptor is setup for
// # 6 processing pipelines BUT there are only 3 available EXCLUSIVE fenced services the CPM
// # will adjust the number of processing pipelines to 3.
// #########################################################################################
boolean exclusiveAccess = false;
String serviceAccess = casProcessorConfig.getDeploymentParameter("service-access");
if (serviceAccess != null && serviceAccess.equalsIgnoreCase("exclusive")) {
exclusiveAccess = true;
}
int howMany = 1;
if (!redeploy) {
howMany = concurrentThreadCount;
}
int serviceCount = attachToServices(redeploy, serviceUri, howMany, aProcessingContainer);
if (redeploy == false && exclusiveAccess && serviceCount < concurrentThreadCount) {
ServiceProxyPool pool = aProcessingContainer.getPool();
int poolSize = pool.getSize();
for (int i = 0; i < poolSize; i++) {
// Adjust number of CasProcessors in the pool
CasProcessor processor = pool.checkOut();
if (processor instanceof NetworkCasProcessorImpl) {
if (((NetworkCasProcessorImpl) processor).getProxy() == null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_reducing_cp_instance_count__FINEST",
new Object[] { Thread.currentThread().getName(), name,
casProcessorConfig.getDeploymentType() });
}
processor = null;
} else {
pool.checkIn(processor);
}
}
}
}
} catch (ResourceConfigurationException e) {
throw e;
} catch (Exception e) {
throw new ResourceConfigurationException(e);
}
}
/**
* Create and attach a proxies to vinci services. For services with exclusive access, makes sure
* that there is only one proxy per service. Otherwise the services are shared.
*
* @param redeploy -
* true if the connection is made as part of the recovery due to previous service crash
* or termination.
* @param aServiceUri -
* servie uri, this is a vinci service name
* @param howMany -
* how many proxies to create and connect
* @param aProcessingContainer -
* hosting container for the proxies. Proxies are assigned to a pool managed by the
* container.
*
* @return - how many proxies were created
* @throws Exception -
* error
*/
private int attachToServices(boolean redeploy, String aServiceUri, int howMany,
ProcessingContainer aProcessingContainer) throws Exception {
VinciServiceInfo serviceInfo = null;
// Retrieve configuration information for the CasProcessor. This is the configuration from the
// CPE descriptor
CasProcessorConfiguration casProcessorConfig = aProcessingContainer
.getCasProcessorConfiguration();
ArrayList serviceList = null;
// Check if this Cas Processor has exclusive service access. Meaning it requires service per
// proxy.
boolean exclusiveAccess = false;
String serviceAccess = casProcessorConfig.getDeploymentParameter("service-access");
if (serviceAccess != null && serviceAccess.equalsIgnoreCase("exclusive")) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_cp_with_exclusive_access__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
casProcessorConfig.getDeploymentType() });
}
exclusiveAccess = true;
// The following is true if the service just crashed and we are trying to recover. The
// container in this case
// should contain a reference to the Cas Processor that failed.
if (((ProcessingContainer_Impl) aProcessingContainer).failedCasProcessorList.size() > 0) {
// In case the service starts on the same port as before it crashed, mark it as
// available in the current service list
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_retrieve_cp_from_failed_cplist__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
casProcessorConfig.getDeploymentType() });
}
CasProcessor processor = (CasProcessor) ((ProcessingContainer_Impl) aProcessingContainer).failedCasProcessorList
.get(0);
// Extract old (stale) proxy from the CAS Processor
VinciTAP tap = ((NetworkCasProcessorImpl) processor).getProxy();
if (tap != null) {
// Since the new service may have started on the same machine and the same port, make sure
// that the
// current list is updated so that the service can be again connected to.
String service_host = tap.getServiceHost();
int service_port = tap.getServicePort();
// Go through all services in the in-use service list and try to find one that matches
// host and port
// If found, change the state of the service to available again.
for (int i = 0; currentServiceList != null && i < currentServiceList.size(); i++) {
VinciServiceInfo si = (VinciServiceInfo) currentServiceList.get(i);
if (si.getHost().equals(service_host) && si.getPort() == service_port) {
si.setAvailable(true);
}
}
}
}
// Retrieve a new service list from the VNS
serviceList = getNewServiceList(aServiceUri, casProcessorConfig);
if (serviceList == null || serviceList.size() == 0 && redeploy == false) {
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_no_service_in_vns__FINEST", new Object[] {
Thread.currentThread().getName(), aServiceUri,
casProcessorConfig.getDeploymentType(),
casProcessorConfig.getDeploymentParameter(Constants.VNS_HOST),
casProcessorConfig.getDeploymentParameter(Constants.VNS_PORT) });
}
// When redeploying/reconnecting, we only interested in one proxy
if (redeploy) {
howMany = 1; // override
}
// If this is the first time through here, make sure the service list is cached. This is the
// current service list.
if (currentServiceList == null) {
currentServiceList = serviceList;
} else {
// Copy the state of each service from current list to new list. Just change the state
// of service from unassigned to assigned if it happens not to be in use.
int newServiceCount = VNSQuery.findUnassigned(currentServiceList, serviceList);
if (newServiceCount > 0) {
// new Services have been started. Use the new list as current.
currentServiceList.clear();
currentServiceList = serviceList;
}
}
}
int succesfullConnections = 0;
int rC = casProcessorConfig.getMaxRestartCount();
int maxTimeToWait = casProcessorConfig.getMaxTimeToWaitBetweenRetries();
// Never sleep indefinitely. Override if the maxTimeToWait = 0
if (maxTimeToWait == 0) {
maxTimeToWait = WAIT_TIME;
}
for (int i = 0; i < howMany; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_activating_service__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
String.valueOf(i), String.valueOf(howMany) });
}
int restartCount = 0;
// flag to indicate that we should sleep between retries when connecting to a fenced service.
// First time through the do-while loop below dont sleep though.
boolean sleepBetweenRetries = false;
// Attempt to connect to remote service and activate the proxy.
do {
try {
if (exclusiveAccess) {
serviceInfo = getNextAvailable(serviceList);
if (serviceInfo == null) {
if (redeploy == false) {
// No more services available. Report how many services we attached to so far
return succesfullConnections;
}
sleepBetweenRetries = true;
}
}
if (sleepBetweenRetries) {
try {
Thread.sleep(maxTimeToWait);
} catch (InterruptedException iex) {
}
if (serviceInfo == null && exclusiveAccess) {
// Get a new service list from the VNS
restartCount++;
serviceList = getNewServiceList(aServiceUri, casProcessorConfig);
// Copy the state of each service from current list to new list. Just change the state
// of service from unassigned to assigned if it happens to be in use.
int newServiceCount = VNSQuery.findUnassigned(currentServiceList, serviceList);
if (newServiceCount > 0) {
currentServiceList.clear();
currentServiceList = serviceList;
}
continue;
}
}
if (exclusiveAccess) {
activateProcessor(casProcessorConfig, serviceInfo.getHost(), serviceInfo.getPort(),
aProcessingContainer, redeploy);
serviceInfo.setAvailable(false);
} else {
// Create and connect a new proxy to the service
activateProcessor(casProcessorConfig, aServiceUri, aProcessingContainer, redeploy);
}
succesfullConnections++;
// on successfull connection, just brake out of the retry loop
break;
} catch (ResourceInitializationException ex) {
if (ex.getCause() instanceof ServiceException
|| ex.getCause() instanceof ServiceDownException) {
if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.WARNING,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_activating_service__WARNING",
new Object[] { Thread.currentThread().getName(),
aProcessingContainer.getName() });
}
// Increment number of CasProcessor restarts (redeploys)
restartCount++;
} else {
restartCount = rC + 1; // Force termination of the loop
}
}
} while (restartCount <= rC);
// If exceeding defined max number of retries, take associated action with this condition
// CasProcessor may be configured to Terminate the CPM, Disable itself or Disregard this
// condition and continue.
if (restartCount > rC) {
handleMaxRestartThresholdReached(aProcessingContainer);
}
}
return succesfullConnections;
}
/**
* Gets the next available.
*
* @param aServiceList the a service list
* @return the next available
*/
private VinciServiceInfo getNextAvailable(ArrayList aServiceList) {
VinciServiceInfo serviceInfo = null;
for (int i = 0; i < aServiceList.size(); i++) {
serviceInfo = (VinciServiceInfo) aServiceList.get(i);
if (serviceInfo != null) {
if (serviceInfo.isAvailable()) {
return serviceInfo;
}
// Invalidate reference
serviceInfo = null;
}
}
return serviceInfo;
}
/**
* Query configured VNS for a list of services with a given service URI.
*
* @param aServiceUri -
* named service endpoint
* @param aCasProcessorConfig -
* Cas Processor configuration
* @return - List of services provided by VNS
* @throws Exception passthru
*/
private ArrayList getNewServiceList(String aServiceUri,
CasProcessorConfiguration aCasProcessorConfig) throws Exception {
String vnsHost = getVNSSettingFor("VNS_HOST", aCasProcessorConfig, "localhost");
String vnsPort = getVNSSettingFor("VNS_PORT", aCasProcessorConfig, "9000");
VNSQuery vnsQuery = new VNSQuery(vnsHost, Integer.parseInt(vnsPort));
return vnsQuery.getServices(aServiceUri);
}
/**
* Handles situation when max restart threshold is reached while connecting to CasProcessor.
*
*
* @param aProcessingContainer -
* container holding CasProcessor configuration
*
* @throws ResourceConfigurationException when max restarts limit reached
*/
private void handleMaxRestartThresholdReached(ProcessingContainer aProcessingContainer)
throws ResourceConfigurationException {
CasProcessorConfiguration casProcessorConfig = aProcessingContainer
.getCasProcessorConfiguration();
String name = casProcessorConfig.getName();
if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.WARNING,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_restart_exceeded__WARNING",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
String.valueOf(casProcessorConfig.getMaxRestartCount()) });
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_restart_exceeded__WARNING",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
casProcessorConfig.getActionOnMaxRestart() });
}
// Throw appropriate exception based on configured action when max restarts are reached
if (Constants.TERMINATE_CPE.equals(casProcessorConfig.getActionOnMaxRestart())) {
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_terminate_onerrors__INFO",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName() });
}
throw new ResourceConfigurationException(new AbortCPMException(CpmLocalizedMessage
.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_EXP_configured_to_abort__WARNING", new Object[] {
Thread.currentThread().getName(), name })));
}
if (Constants.KILL_PROCESSING_PIPELINE.equals(casProcessorConfig.getActionOnMaxRestart())) {
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_terminate_pipeline__INFO",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName() });
}
throw new ResourceConfigurationException(new KillPipelineException(CpmLocalizedMessage
.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_EXP_configured_to_kill_pipeline__WARNING", new Object[] {
Thread.currentThread().getName(), name })));
} else if (Constants.DISABLE_CASPROCESSOR.equals(casProcessorConfig.getActionOnMaxRestart())) {
aProcessingContainer.setStatus(Constants.CAS_PROCESSOR_DISABLED);
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_disable_cp__INFO",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName() });
}
return;
}
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_disable_cp__INFO",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName() });
}
}
/**
* Returns CasProcessor service name from a descriptor.
*
* @param aCasProcessorConfig -
* CasProcessor configuration containing service descriptor path
* @return - name of the service
* @throws ResourceConfigurationException if the uri is missing or empty
*/
private String getServiceUri(CasProcessorConfiguration aCasProcessorConfig)
throws ResourceConfigurationException {
URL descriptorUrl;
descriptorUrl = aCasProcessorConfig.getDescriptorUrl();
URISpecifier uriSpecifier = getURISpecifier(descriptorUrl);
String uri = uriSpecifier.getUri();
if (uri == null || uri.trim().length() == 0) {
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_invalid_deployment__SEVERE", new Object[] {
Thread.currentThread().getName(), descriptorUrl, uri });
}
return uri;
}
/**
* Returns URISpecifier.
*
* @param aDescriptorUrl the a descriptor url
* @return URISpecifier
* @throws ResourceConfigurationException if the resource specifier in the URI is not a URISpecifier
*/
private URISpecifier getURISpecifier(URL aDescriptorUrl) throws ResourceConfigurationException {
ResourceSpecifier resourceSpecifier = getSpecifier(aDescriptorUrl);
if (!(resourceSpecifier instanceof URISpecifier)) {
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_invalid_deployment__SEVERE", new Object[] {
Thread.currentThread().getName(), aDescriptorUrl, null });
}
return (URISpecifier) resourceSpecifier;
}
/**
* Parses given service descriptor and returns initialized ResourceSpecifier.
*
* @param aUrl -
* URL of the descriptor
* @return - ResourceSpecifier parsed from descriptor
* @throws ResourceConfigurationException wraps Exception
*/
private ResourceSpecifier getSpecifier(URL aUrl) throws ResourceConfigurationException {
try {
XMLInputSource in = new XMLInputSource(aUrl);
return UIMAFramework.getXMLParser().parseResourceSpecifier(in);
} catch (Exception e) {
e.printStackTrace();
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_invalid_deployment__SEVERE", new Object[] {
Thread.currentThread().getName(), aUrl, null });
}
}
/**
* Deploys internal VNS for use with local CasProcessor deployments.
*
* @param casProcessorConfig -
* CasProcessor configuration
* @param redeploy -
* flag indicating if VNS being redeployed
* @throws CasProcessorDeploymentException wraps Exception
*/
private void deployVNS(CasProcessorConfiguration casProcessorConfig, boolean redeploy)
throws CasProcessorDeploymentException {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.SEVERE,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_deploy_vns_redeploy__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(redeploy),
String.valueOf(restartCount) });
}
int vnsPort = Integer.parseInt(DEFAULT_VNS_PORT);
int startPort = DEFAULT_SERVICE_PORT;
int maxPort = DEFAULT_SERVICE_PORT + DEFAULT_SERVICE_PORT_RANGE;
// Check if want to override default port for the VNS
// To override use -DVNS_PORT=value
if (System.getProperty("LOCAL_VNS_PORT") != null) {
try {
vnsPort = Integer.parseInt(System.getProperty("LOCAL_VNS_PORT"));
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
if (redeploy) {
restartCount++;
try {
startPort += restartCount;
} catch (NumberFormatException e) {
e.printStackTrace();
throw new CasProcessorDeploymentException(e);
}
}
synchronized (VinciCasProcessorDeployer.class) {
if (vns == null) {
try {
vns = new LocalVNS(startPort, maxPort, vnsPort);
vns.setConnectionPool(portQueue);
localVNSThread = new Thread(vns);
localVNSThread.start();
} catch (Exception e) {
throw new CasProcessorDeploymentException(e);
}
}
}
}
// Never called
// /**
// * Creates a proxy and connects it to Vinci service running on a given port. Once connected the
// * proxy is associated with Cas Processor.
// *
// * @param aCasProcessorConfig -
// * CasProcessor configuration
// * @param port -
// * port wher vinci service is listening
// * @return Connected proxy to service
// *
// * @throws ResourceInitializationException
// * @throws ResourceConfigurationException
// */
// private synchronized boolean activateProcessor(CasProcessorConfiguration aCasProcessorConfig,
// String aHost, int aPort) throws ResourceInitializationException,
// ResourceConfigurationException {
// // Instantiate proxy from given configuration
// VinciTAP tap = getTextAnalysisProxy(aCasProcessorConfig);
// if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
// UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
// "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
// "UIMA_CPM_activating_service_on_port__FINEST",
// new Object[] { Thread.currentThread().getName(), String.valueOf(aPort) });
// }
// boolean tryAgain = true;
// int maxCount = aCasProcessorConfig.getMaxRestartCount();
// while (tryAgain) {
// try {
// // Connect proxy to service running on given port on the same machine as the CPM
// tap.connect(aHost, aPort);
// // Connection established no need to retry
// tryAgain = false;
// } catch (Exception e) {
// if (maxCount-- == 0) {
// e.printStackTrace();
// if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
// UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE,
// Thread.currentThread().getName() + "", e);
// }
// throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
// "UIMA_CPM_no_service_connection__SEVERE", new Object[] {
// Thread.currentThread().getName(), String.valueOf(aPort), aHost,
// aCasProcessorConfig.getName() });
// }
// try {
// synchronized (monitor) {
// monitor.wait(SLEEP_TIME);
// }
// } catch (InterruptedException ex) {
// }
// }
// }
// // At this point there is a connected proxy (tap), now we need to add it to container that will
// // menage it.
// bindProxyToNetworkCasProcessor(tap);
// return true;
// }
/**
* Creates a proxy and connects it to Vinci service running on a given port. Once connected the
* proxy is associated with Cas Processor.
*
* @param aCasProcessorConfig -
* CasProcessor configuration
* @param aHost the a host
* @param aPort the a port
* @param aProcessingContainer the a processing container
* @param redeploy the redeploy
* @return Connected proxy to service
* @throws ResourceConfigurationException wraps Exception
* @throws Exception passthru
*/
private synchronized boolean activateProcessor(CasProcessorConfiguration aCasProcessorConfig,
String aHost, int aPort, ProcessingContainer aProcessingContainer, boolean redeploy)
throws ResourceConfigurationException, Exception {
// Instantiate proxy from given configuration
VinciTAP tap = getTextAnalysisProxy(aCasProcessorConfig);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_activating_service_on_port__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(aPort) });
}
boolean tryAgain = true;
int maxCount = aCasProcessorConfig.getMaxRestartCount();
while (tryAgain) {
try {
// Connect proxy to service running on given port on the same machine as the CPM
tap.connect(aHost, aPort);
// Connection established no need to retry
tryAgain = false;
} catch (Exception e) {
if (maxCount-- == 0) {
e.printStackTrace();
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE,
Thread.currentThread().getName() + "", e);
}
throw new ResourceConfigurationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_no_service_connection__SEVERE", new Object[] {
Thread.currentThread().getName(), String.valueOf(aPort), aHost,
aCasProcessorConfig.getName() });
}
try {
synchronized (monitor) {
monitor.wait(SLEEP_TIME);
}
} catch (InterruptedException ex) {
}
}
}
// At this point there is a connected proxy (tap), now we need to add it to container that will
// menage it.
bindProxyToNetworkCasProcessor(tap, aProcessingContainer, redeploy);
return true;
}
/**
* Creates a proxy and connects it to Vinci service. Uses VNS to locate service by name.
*
* @param aCasProcessorConfig -
* CasProcees configuration
* @param aService -
* name of the vinci service
* @param aProcessingContainer the a processing container
* @param redeploy the redeploy
* @return - connected Proxy
* @throws Exception passthru
*/
private synchronized boolean activateProcessor(CasProcessorConfiguration aCasProcessorConfig,
String aService, ProcessingContainer aProcessingContainer, boolean redeploy) // throws
// ResourceInitializationException,
// ResourceConfigurationException
throws Exception {
VinciTAP tap = getTextAnalysisProxy(aCasProcessorConfig);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_activating_service_on_port__FINEST",
new Object[] { Thread.currentThread().getName(), aService });
}
boolean tryAgain = true;
int maxCount = aCasProcessorConfig.getMaxRestartCount();
int maxTimeToWait = aCasProcessorConfig.getMaxTimeToWaitBetweenRetries();
// Never sleep indefinitely. Override if the maxTimeToWait = 0
if (maxTimeToWait == 0) {
maxTimeToWait = WAIT_TIME;
}
// Initially dont sleep
boolean sleepBetweenRetries = false;
while (tryAgain) {
try {
if ( sleepBetweenRetries ) {
wait(maxTimeToWait);
} else {
sleepBetweenRetries = true;
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_connecting_to_service__FINEST",
new Object[] { Thread.currentThread().getName(), aService });
}
// Try to connect to service using its service name
tap.connect(aService);
tryAgain = false;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_connection_established__FINEST",
new Object[] { Thread.currentThread().getName(), aService });
}
} catch (Exception e) {
if (maxCount-- == 0) {
e.printStackTrace();
if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.WARNING,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_max_connect_retries_exceeded__FINEST",
new Object[] { Thread.currentThread().getName(), aService,
String.valueOf(maxCount) });
}
throw new ResourceInitializationException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE, new Object[] {
Thread.currentThread().getName(), aService, String.valueOf(maxCount) },
new ServiceConnectionException("Unable to connect to service :::" + aService));
}
}
}
// Connected to the service so add tap to Cas Processor
bindProxyToNetworkCasProcessor(tap, aProcessingContainer, redeploy);
return true;
}
// Never called
// /**
// * Associates connected proxy with an instance of CasProcessor.
// *
// * @param aTap -
// * connected proxy
// */
// private void bindProxyToNetworkCasProcessor(VinciTAP aTap) {
// if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
// UIMAFramework.getLogger(this.getClass())
// .logrb(
// Level.FINEST,
// this.getClass().getName(),
// "initialize",
// CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
// "UIMA_CPM_get_cp_from_pool__FINEST",
// new Object[] { Thread.currentThread().getName(), aTap.getServiceHost(),
// String.valueOf(aTap.getServicePort()),
// String.valueOf(casProcessorPool.getSize()) });
// }
// synchronized (casProcessorPool) {
// if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
// UIMAFramework.getLogger(this.getClass()).logrb(
// Level.FINEST,
// this.getClass().getName(),
// "initialize",
// CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
// "UIMA_CPM_get_cp_from_pool__FINEST",
// new Object[] { Thread.currentThread().getName(), aTap.getServiceHost(),
// String.valueOf(aTap.getServicePort()),
// String.valueOf(casProcessorPool.getSize()) });
// }
// CasProcessor processor = null;
// // There are as many Cas Processors as there are Procesing Threads. All of them have been
// // already created and exist
// // in the cas processor pool. Cas Processor may or may not already have an associated proxy.
// // So here we cycle through all Cas Processors until we find one that has not yet have a proxy
// // and
// // add it.
// for (int i = 0; i < casProcessorPool.getSize(); i++) {
// try {
// // Check out the next Cas Processor from the pool
// processor = casProcessorPool.checkOut();
// if (processor == null) {
// if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
// UIMAFramework.getLogger(this.getClass()).logrb(
// Level.SEVERE,
// this.getClass().getName(),
// "initialize",
// CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
// "UIMA_CPM_get_cp_from_pool_error__SEVERE",
// new Object[] { Thread.currentThread().getName(), aTap.getServiceHost(),
// String.valueOf(aTap.getServicePort()),
// String.valueOf(casProcessorPool.getSize()) });
// }
// continue;
// }
// // Add proxy only to instances of NetworkCasProcessorImpl
// if (processor instanceof NetworkCasProcessorImpl) {
// NetworkCasProcessorImpl netProcessor = (NetworkCasProcessorImpl) processor;
// // Check if this Cas Processor has already been assigned a proxy. If so,
// // get the next Cas Processor
// if (netProcessor.getProxy() != null && netProcessor.getProxy().isConnected()) {
// if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
// UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
// this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
// "UIMA_CPM_already_allocated__FINEST",
// new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
// }
// continue;
// }
// if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
// UIMAFramework.getLogger(this.getClass()).logrb(
// Level.FINEST,
// this.getClass().getName(),
// "initialize",
// CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
// "UIMA_CPM_assign_cp_to_service__FINEST",
// new Object[] { Thread.currentThread().getName(), String.valueOf(i),
// aTap.getServiceHost(), String.valueOf(aTap.getServicePort()) });
// }
// // Associate the proxy with this Cas Processor
// ((NetworkCasProcessorImpl) processor).setProxy(aTap);
// synchronized (monitor) {
//
// monitor.notifyAll();
// }
// }
// break;
// } catch (Exception e) {
// e.printStackTrace();
// } finally {
// if (processor != null) {
// if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
// UIMAFramework.getLogger(this.getClass()).logrb(
// Level.FINEST,
// this.getClass().getName(),
// "initialize",
// CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
// "UIMA_CPM_checkin_cp_to_pool__FINEST",
// new Object[] { Thread.currentThread().getName(), String.valueOf(i),
// aTap.getServiceHost(), String.valueOf(aTap.getServicePort()) });
// }
// casProcessorPool.checkIn(processor);
// if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
// UIMAFramework.getLogger(this.getClass()).logrb(
// Level.FINEST,
// this.getClass().getName(),
// "initialize",
// CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
// "UIMA_CPM_checked_in_cp_to_pool__FINEST",
// new Object[] { Thread.currentThread().getName(), String.valueOf(i),
// aTap.getServiceHost(), String.valueOf(aTap.getServicePort()) });
// }
// }
// }
// }
// }
// }
/**
* Associates connected proxy with an instance of CasProcessor.
*
* @param aTap -
* connected proxy
* @param aProcessingContainer the a processing container
* @param redeploy the redeploy
* @throws Exception the exception
*/
private void bindProxyToNetworkCasProcessor(VinciTAP aTap,
ProcessingContainer aProcessingContainer, boolean redeploy) throws Exception {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_checked_in_cp_to_pool__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
aTap.getServiceHost(), String.valueOf(aTap.getServicePort()) });
}
CasProcessor processor = null;
boolean processorAssigned = false;
try {
if (redeploy) {
// Check out the next Cas Processor from the pool
if (((ProcessingContainer_Impl) aProcessingContainer).failedCasProcessorList.isEmpty()) {
throw new ResourceProcessException(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_no_cp_instance_in_failed_list__FINEST", new Object[] {
Thread.currentThread().getName(), aProcessingContainer.getName(),
aTap.getServiceHost(), String.valueOf(aTap.getServicePort()) },
new Exception(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_EXP_cp_not_in_failed_list__WARNING", new Object[] {
Thread.currentThread().getName(), aProcessingContainer.getName() })));
}
processor = (CasProcessor) ((ProcessingContainer_Impl) aProcessingContainer).failedCasProcessorList
.get(0);
// Add proxy only to instances of NetworkCasProcessorImpl
if (processor instanceof NetworkCasProcessorImpl) {
// NetworkCasProcessorImpl netProcessor = (NetworkCasProcessorImpl) processor;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_assign_cp_to_service__FINEST",
new Object[] { Thread.currentThread().getName(),
aProcessingContainer.getName(), aTap.getServiceHost(),
String.valueOf(aTap.getServicePort()) });
}
// Associate the proxy with this Cas Processor
((NetworkCasProcessorImpl) processor).setProxy(aTap);
processorAssigned = true;
synchronized (monitor) {
monitor.notifyAll();
}
}
} else {
int cpPoolSize = casProcessorPool.getSize();
for (int i = 0; i < cpPoolSize; i++) {
// Check out the next Cas Processor from the pool
processor = casProcessorPool.checkOut();
if (processor == null) {
if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.SEVERE,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_get_cp_from_pool_error__SEVERE",
new Object[] { Thread.currentThread().getName(), aTap.getServiceHost(),
String.valueOf(aTap.getServicePort()),
String.valueOf(casProcessorPool.getAllInstanceCount()) });
}
continue;
}
// Add proxy only to instances of NetworkCasProcessorImpl
if (processor instanceof NetworkCasProcessorImpl) {
NetworkCasProcessorImpl netProcessor = (NetworkCasProcessorImpl) processor;
// Check if this Cas Processor has already been assigned a proxy. If so,
// get the next Cas Processor
if (netProcessor.getProxy() != null && netProcessor.getProxy().isConnected()) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_already_allocated__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
}
continue;
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_assign_cp_to_service__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(i),
aTap.getServiceHost(), String.valueOf(aTap.getServicePort()) });
}
// Associate the proxy with this Cas Processor
((NetworkCasProcessorImpl) processor).setProxy(aTap);
processorAssigned = true;
synchronized (monitor) {
monitor.notifyAll();
}
break;
}
}
}
} catch (Exception e) {
throw e;
} finally {
if (processor != null) {
if (processorAssigned) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_checkin_cp_to_pool__FINEST",
new Object[] { Thread.currentThread().getName(),
aProcessingContainer.getName(), aTap.getServiceHost(),
String.valueOf(aTap.getServicePort()) });
}
casProcessorPool.checkIn(processor);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_checked_in_cp_to_pool__FINEST",
new Object[] { Thread.currentThread().getName(),
aProcessingContainer.getName(), aTap.getServiceHost(),
String.valueOf(aTap.getServicePort()) });
}
} else {
// Put it back into the failed Cas Processor List for subsequent retries
((ProcessingContainer_Impl) aProcessingContainer).failedCasProcessorList.add(processor);
}
}
}
}
/**
* Creates and initializes proxy that will be used to connect to Vinci service.
*
* @param aCasProcessorConfig -
* CasProcessor configuration
* @return - new proxy (not yet connected to service)
* @throws ResourceConfigurationException wraps Exception
*/
private VinciTAP getTextAnalysisProxy(CasProcessorConfiguration aCasProcessorConfig)
throws ResourceConfigurationException {
VinciTAP tap = new VinciTAP();
String vnsHost = null;
String vnsPort = null;
// For 'local' or managed Cas Processor, get the VNS port
if (aCasProcessorConfig.getDeploymentType() != null
&& Constants.DEPLOYMENT_LOCAL.equalsIgnoreCase(aCasProcessorConfig.getDeploymentType())) {
vnsHost = "localhost"; // default for local deployment
vnsPort = ""; // intialize
try {
vnsPort = String.valueOf(vns.getPort());
} catch (Exception e) {
throw new ResourceConfigurationException(e);
}
} else {
// try to retrieve VNS location (host and port) from the service descriptor. If not found,
// try to find the settings in the following order:
// 1) check CPE descriptor settings (in <deploymentParameters>)
// 2) check System property (set via -D on the command line)
// 3) use defaults ( VNS_HOST=localhost and VNS_PORT=9000)
if (vnsHost == null) {
vnsHost = getVNSSettingFor("VNS_HOST", aCasProcessorConfig, "localhost");
}
if (vnsPort == null) {
vnsPort = getVNSSettingFor("VNS_PORT", aCasProcessorConfig, "9000");
}
}
// Get the max timeout the CPE will wait for response from the Cas Processor
long timeout = aCasProcessorConfig.getTimeout();
String[] keysToDrop = aCasProcessorConfig.getKeysToDrop();
if (keysToDrop != null) {
tap.setKeys2Drop(keysToDrop);
}
// Configure the proxy with VNS settings and timeout
tap.setVNSHost(vnsHost);
tap.setVNSPort(vnsPort);
tap.setTimeout((int) timeout);
String timerClass = "";
try {
timerClass = cpeFactory.getCPEConfig().getTimerImpl();
} catch (Exception e) {
// ignore. Use defa1ult
}
if (timerClass != null) {
try {
tap.setTimer(CPMUtils.getTimer(cpeFactory.getCPEConfig().getCpeTimer().get()));
} catch (Exception e) {
e.printStackTrace();
throw new ResourceConfigurationException(e);
}
}
return tap;
}
/**
* Returns a value for a named VNS parameter (either VNS_HOST or VNS_PORT). The parameter is
* resolved with the following priority: 1) Find the parameter in the Service descriptor 2) Find
* the parameter in the CPE descriptor 3) Find the parameter as System.property (using -D on the
* command line) 4) Use Hardcoded default ( VNS_HOST=localhost VNS_PORT=9000)
*
*
* @param aVNSParamKey -
* name of the VNS parameter for which the value is sought
* @param aCasProcessorConfig -
* CPE descriptor settings
* @param aDefault -
* default value for the named parameter if parameter is not defined
*
* @return - value for a named VNS parameter
* @throws ResourceConfigurationException passthru
*/
private String getVNSSettingFor(String aVNSParamKey,
CasProcessorConfiguration aCasProcessorConfig, String aDefault)
throws ResourceConfigurationException {
String vnsParam = null;
URL descriptorUrl = aCasProcessorConfig.getDescriptorUrl();
URISpecifier uriSpecifier = getURISpecifier(descriptorUrl);
Parameter[] params = uriSpecifier.getParameters();
for (int i = 0; params != null && i < params.length; i++) {
if (aVNSParamKey.equals(params[i].getName())) {
vnsParam = params[i].getValue();
break;
}
}
if (vnsParam == null) {
// VNS not defined in the service descriptor. Use settings from CPE descriptor, if present
// First translate the key. Unfortunately, the key in the CPE descriptor is vnsHost instead of
// VNS_HOST and vnsPort instead of VNS_PORT
if (aVNSParamKey.equals("VNS_HOST")) {
vnsParam = aCasProcessorConfig.getDeploymentParameter(Constants.VNS_HOST);
} else if (aVNSParamKey.equals("VNS_PORT")) {
vnsParam = aCasProcessorConfig.getDeploymentParameter(Constants.VNS_PORT);
}
if (vnsParam == null) // VNS not defined in the CPE descriptor. Use -D parameter first if
// it exists. If it does not exist, used default value
{
if ((vnsParam = System.getProperty(aVNSParamKey)) == null) {
vnsParam = aDefault; // default
}
}
}
return vnsParam;
}
/**
* This method is used during a launch of the local or managed Cas Processor. Here connections to
* Cas Processors running as vinci services are made. Connections are established to services
* whose ports are provided in the portQueue. Whenever a managed Cas Processor starts up, it
* contacts local vns and is assigned a port to run on. The same port is added to the port queue
* and used here for establishing a connection.
*
* @param aProcessingContainer the a processing container
* @param casProcessorConfig -
* CasProcessor configuration
* @param redeploy -
* flag indicating if if this is restart
* @param howMany -
* indicates how many connections to make
* @throws ConnectException -
* unable to establish connection to Cas Processor
*/
private void connectToServices(ProcessingContainer aProcessingContainer,
CasProcessorConfiguration casProcessorConfig, boolean redeploy, int howMany)
throws ConnectException {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_thread_count__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
String.valueOf(howMany) });
}
// Create as many proxies as there are concurrent processing pipelines
// The assumption is that the services have started up and retrieved ports to run on from the
// local VNS
// managed by the CPE.
for (int i = 0; i < howMany; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_activating_service_on_port2__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
String.valueOf(i), String.valueOf(howMany) });
}
Object portObject = null;
int port = -1;
try {
// get the next port from the queue. This queue is filled by the local VNS
// during service registration. On startup each service registers itself
// with local VNS and is given a port to run on. The assigned port is than
// enqueued by the vns onto the portQueue.
portObject = getPort(portQueue);
port = Integer.parseInt((String) portObject);
} catch (TimeLimitExceededException e) {
e.printStackTrace();
throw new ConnectException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_no_service_port__WARNING",
new Object[] { Thread.currentThread().getName() }));
} catch (NumberFormatException e) {
e.printStackTrace();
if (portObject != null && portObject instanceof String) {
throw new ConnectException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_invalid_service_port__WARNING",
new Object[] { Thread.currentThread().getName(), (String) portObject }));
} else {
throw new ConnectException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_invalid_service_port__WARNING",
new Object[] { Thread.currentThread().getName(), "Not Available" }));
}
} catch (Exception e) {
e.printStackTrace();
if (portObject != null && portObject instanceof String) {
throw new ConnectException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_invalid_service_port__WARNING",
new Object[] { Thread.currentThread().getName(), (String) portObject }));
} else {
throw new ConnectException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_invalid_service_port__WARNING",
new Object[] { Thread.currentThread().getName(), "Not Available" }));
}
}
// Activate the CasProcessor by instantiating a proxy to the vinci service running
// on a given port.
try {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(
Level.FINEST,
this.getClass().getName(),
"initialize",
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_activating_service_on_port__FINEST",
new Object[] { Thread.currentThread().getName(), aProcessingContainer.getName(),
String.valueOf(port) });
}
// There is a valid port, so connect to it
activateProcessor(casProcessorConfig, "127.0.0.1", port, aProcessingContainer, redeploy);
} catch (Exception e) {
e.printStackTrace();
throw new ConnectException(e.getMessage());
}
}
}
/**
* Used during a launch of the managed (local) Cas Processor this method returns a port number on
* which the Cas Processor is waiting for requests. Each Cas Processor was a given a port by the
* local vns where it is expected to accept requests from clients. The ports assigned to Cas
* Processors are managed by the local instance of the VNS and available in the queue <i>portQueue</i>.
*
* @param portQueue -
* queue containing ports assigned to services by local VNS
* @return - port as String
* @throws TimeLimitExceededException timeout waiting for port
*/
private String getPort(BoundedWorkQueue portQueue) throws TimeLimitExceededException {
int waitCount = MAX_WAIT_TRIES; // default
try {
waitCount = System.getProperty(CONN_RETRY_COUNT) != null ? Integer.parseInt(System
.getProperty(CONN_RETRY_COUNT)) : MAX_WAIT_TRIES;
} catch (NumberFormatException e) {
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_invalid_retry_count__INFO",
new Object[] { Thread.currentThread().getName() });
}
waitCount = MAX_WAIT_TRIES; // restore default
}
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_got_port_from_queue__INFO",
new Object[] { Thread.currentThread().getName(), "" });
}
while (portQueue.getCurrentSize() == 0) {
try {
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_service_port_not_allocated__INFO",
new Object[] { Thread.currentThread().getName(), String.valueOf(waitCount) });
}
Thread.sleep(WAIT_TIME);
} catch (InterruptedException e) {
}
if (waitCount-- <= 0) {
throw new TimeLimitExceededException(CpmLocalizedMessage.getLocalizedMessage(
CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_EXP_timeout_no_service_port__WARNING", new Object[] {
Thread.currentThread().getName(),
String.valueOf(waitCount * WAIT_TIME + " millis") }));
}
}
Object portObject = portQueue.dequeue();
if (UIMAFramework.getLogger().isLoggable(Level.INFO)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.INFO, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_got_port_from_queue__INFO",
new Object[] { Thread.currentThread().getName(), (String) portObject });
}
return (String) portObject;
}
/**
* Shutdown local VNS.
*
* @throws CasProcessorDeploymentException the cas processor deployment exception
*/
@Override
public void undeploy() throws CasProcessorDeploymentException {
if (vns != null) {
vns.shutdown();
localVNSThread = null;
}
}
/*
* (non-Javadoc)
*
* @see org.apache.uima.collection.base_cpm.container.deployer.CasProcessorDeployer#undeploy()
*/
@Override
public void undeploy(URL aURL) throws CasProcessorDeploymentException {
}
}