blob: 2af44109b46c21184dc607c3e76a8651ba80558f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.sm;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.persistence.services.IStateServices;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.user.common.QuotedOptions;
/**
* Represent a single instance.
*
* This is a simple class, mostly just a container for the state machine.
*/
class ServiceInstance
implements SmConstants
{
private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), COMPONENT_NAME);
long numeric_id; // unique numeric ducc-assigned id
long share_id; // RM's share ID for this instance
int instance_id = 0; // unique and constant ID assigned by SM to this instance
// which allows services to know "which" instance they are
// UIMA-4258
String host; // Where the instance is scheduled
ServiceSet sset; // handle to the service definitiopn
JobState state = JobState.Undefined; // orchestartor state
String user = null;
boolean stopped; // careful .. this means it was stopped by a stop order from somewhere,
// NOT that it's terminating
String ducc_home = System.getProperty(IDuccUser.EnvironmentVariable.DUCC_HOME.value());
String api_classpath = ducc_home + "/lib/uima-ducc-cli.jar" + ":" + System.getProperty("java.class.path");
ServiceInstance(ServiceSet sset)
{
this.numeric_id = -1;
this.sset = sset;
this.stopped = true;
this.share_id = -1;
this.host = "<unknown>";
}
// UIMA-4258
public int getInstanceId()
{
return instance_id;
}
// UIMA-4258
public void setInstanceId(int id)
{
this.instance_id = id;
}
public long getId() {
return this.numeric_id;
}
void setId(long id) {
this.numeric_id = id;
}
public long getShareId()
{
return share_id;
}
public String getHost()
{
return host;
}
void setUser(String user)
{
this.user = user;
}
public void setState(JobState state)
{
this.state = state;
}
public JobState getState()
{
return this.state;
}
/**
* Stopped by some stop order?
*/
public synchronized boolean isStopped()
{
return this.stopped;
}
/**
* On it's way up, or already up, and not stopped for any reason.
*/
public synchronized boolean isRunning()
{
// String methodName = "setState";
// Received, // Job has been vetted, persisted, and assigned unique Id
// WaitingForDriver, // Process Manager is launching Job Driver
// WaitingForServices, // Service Manager is checking/starting services for Job
// WaitingForResources, // Scheduler is assigning resources to Job
// Initializing, // Process Agents are initializing pipelines
// Running, // At least one Process Agent has reported process initialization complete
// Completing, // Job processing is completing
// Completed, // Job processing is completed
// Undefined // None of the above
switch ( state ) {
case Completing:
case Completed:
return false;
default:
return !isStopped();
}
}
synchronized void update(long share_id, String host)
{
this.share_id = share_id;
this.host = host;
}
synchronized void setStopped(boolean s)
{
this.stopped = s;
}
// void setState(DuccWorkJob dwj)
// {
// this.state = dwj.getJobState();
// }
String[] genArgs(DuccProperties props)
{
List<String> args = new ArrayList<String>();
args.add(System.getProperty("ducc.agent.launcher.ducc_spawn_path"));
args.add("-u");
args.add(user);
args.add("--");
args.add(System.getProperty("ducc.jvm"));
args.add("-cp");
args.add(api_classpath);
args.add("org.apache.uima.ducc.cli.DuccServiceSubmit");
args.add("--service_id");
args.add(sset.getId().toString());
@SuppressWarnings("rawtypes")
Enumeration keys = props.propertyNames();
while ( keys.hasMoreElements() ) {
String k = (String) keys.nextElement();
// System.out.println("------ Set argument " + k + " to " + ((String)props.get(k)));
String v = (String) props.get(k);
args.add("--" + k);
if (!k.equals("debug")) { // Only debug has no value
args.add(v);
}
}
return args.toArray(new String[args.size()]);
}
ArrayList<String> stdout_lines = new ArrayList<String>();
ArrayList<String> stderr_lines = new ArrayList<String>();
long start(DuccProperties svc_props, DuccProperties meta_props)
{
String methodName = "start";
logger.info(methodName, sset.getId(), "START INSTANCE");
setStopped(false);
this.user = meta_props.getProperty(IStateServices.SvcMetaProps.user.pname());
// Simple use of ducc_ling, just submit as the user. The specification will have the working directory
// and classpath needed for the service, handled by the Orchestrator and Job Driver.
String[] args = genArgs(svc_props);
for ( int i = 0; i < args.length; i++ ) {
if ( i > 0 && (args[i-1].equals("-cp") ) ) {
// The classpaths can be just awful filling the logs with junk. It will end up in the agent log
// anyway so let's inhibit it here.
logger.debug(methodName, sset.getId(), "Args[", i, "]: <CLASSPATH>");
} else {
logger.debug(methodName, sset.getId(), "Args[", i, "]:", args[i]);
}
}
ProcessBuilder pb = new ProcessBuilder(args);
StdioListener sin_listener = null;
StdioListener ser_listener = null;
Map<String, String> env = pb.environment();
env.put(IDuccUser.EnvironmentVariable.DUCC_HOME.value(), System.getProperty(IDuccUser.EnvironmentVariable.DUCC_HOME.value()));
env.put(IDuccUser.EnvironmentVariable.DUCC_ID_SERVICE.value(), Integer.toString(instance_id)); // UIMA-4258
// for runmode = Test
String runmode = DuccPropertiesResolver.get(DuccPropertiesResolver.ducc_runmode);
if(runmode != null) {
if(runmode.equals("Test")) {
env.put(IDuccUser.EnvironmentVariable.USER.value(), this.user);
}
}
// Extract the DUCC_UMASK setting and put it ducc_ling's environment UIMA-5328
// Could use QuotedOprtions to build a map but since we want just one ...
// ArrayList<String> envVarList = QuotedOptions.tokenizeList(environment, true);
final String umaskKey = "DUCC_UMASK";
String envValue = svc_props.getProperty(IStateServices.SvcRegProps.environment.pname());
if (envValue != null) {
List<String> envList = Arrays.asList(envValue.split("\\s+")); // No need to strip quotes ... !?
Map<String, String> envMap = QuotedOptions.parseAssignments(envList, 0);
String umask = envMap.get(umaskKey);
if (umask != null) {
env.put(umaskKey, umask);
}
}
try {
Process p = pb.start();
InputStream stdout = p.getInputStream();
InputStream stderr = p.getErrorStream();
sin_listener = new StdioListener(1, stdout);
ser_listener = new StdioListener(2, stderr);
Thread sol = new Thread(sin_listener);
Thread sel = new Thread(ser_listener);
sol.start();
sel.start();
int rc = p.waitFor();
logger.debug(methodName, null, "DuccServiceSubmit returns with rc", rc);
sin_listener.stop();
ser_listener.stop();
} catch (Throwable t) {
logger.error(methodName, sset.getId(), t);
try {
sset.setErrorString(t.toString());
} catch ( Exception e ) {
logger.warn(methodName, sset.getId(), "Error updating meta properties:", e);
}
return -1;
}
for ( String s : stderr_lines ) {
logger.info(methodName, sset.getId(), "Start stderr:", s);
}
// That was annoying. Now search the lines for some hint of the id.
boolean inhibit_cp = false;
boolean started = false;
StringBuffer submit_buffer = new StringBuffer();
boolean recording = false;
for ( String s : stdout_lines ) {
// simple logic to inhibit printing the danged classpath
if ( inhibit_cp ) {
inhibit_cp = false;
logger.info(methodName, sset.getId(), "<INHIBITED CP>");
} else {
logger.info(methodName, sset.getId(), "Start stdout:", s);
}
if ( s.indexOf("-cp") >= 0 ) {
inhibit_cp = true;
}
if ( recording ) {
submit_buffer.append(s.trim());
submit_buffer.append(";");
}
if ( s.startsWith("1001 Command launching...") ) {
recording = true;
continue;
}
// e.g. Service instance 18803 submitted
if ( s.startsWith("Service") && s.endsWith("submitted") ) {
String[] toks = s.split("\\s");
try {
numeric_id = Long.parseLong(toks[2]);
started = true;
logger.info(methodName, null, "Request to start service " + sset.getId().toString() + " accepted as service instance ", numeric_id);
} catch ( NumberFormatException e ) {
try {
sset.setErrorString("Request to start service " + sset.getId().toString() + " failed, can't interpret submit response.: " + s);
} catch ( Exception ee ) {
logger.warn(methodName, sset.getId(), "Error updating meta properties:", ee);
}
logger.warn(methodName, null, "Request to start service " + sset.getId().toString() + " failed, can't interpret response.: " + s);
}
}
}
if ( ! started ) {
logger.warn(methodName, sset.getId(), "Request to start service " + sset.getId().toString() + " failed.");
meta_props.put(IStateServices.SvcMetaProps.submit_error.pname(), submit_buffer.toString());
sset.log_errors(stdout_lines, stderr_lines);
} else {
meta_props.remove(IStateServices.SvcMetaProps.submit_error.pname());
state = JobState.Received;
}
logger.info(methodName, sset.getId(), "START INSTANCE COMPLETE");
stdout_lines.clear();
stderr_lines.clear();
return numeric_id;
}
/**
* This assumes the caller has already verified that I'm a registered service.
*/
void stop()
{
String methodName = "stop";
/*
* Accommodate use of service id and endpoint for information purposes,
* which informs system event logger when canceling the instance.
*/
String service_id = sset.getId().toString();
String service_endpoint = sset.getEndpoint();
setStopped(true);
String[] args = {
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
"-u",
user,
"--",
System.getProperty("ducc.jvm"),
"-cp",
api_classpath,
"org.apache.uima.ducc.cli.DuccServiceCancel",
"--id",
Long.toString(numeric_id),
"--service_id",
service_id,
"--service_request_endpoint",
service_endpoint,
};
for ( int i = 0; i < args.length; i++ ) {
if ( i > 0 && (args[i-1].equals("-cp") ) ) {
// The classpaths can be just awful filling the logs with junk. It will end up in the agent log
// anyway so let's inhibit it here.
logger.debug(methodName, sset.getId(), "Instance", numeric_id, "Args[", i, "]: <CLASSPATH>");
} else {
logger.debug(methodName, sset.getId(), "Instance", numeric_id, "Args[", i, "]:", args[i]);
}
}
ProcessBuilder pb = new ProcessBuilder(args);
Map<String, String> env = pb.environment();
env.put(IDuccUser.EnvironmentVariable.DUCC_HOME.value(), System.getProperty(IDuccUser.EnvironmentVariable.DUCC_HOME.value()));
// for runmode = Test
String runmode = DuccPropertiesResolver.get(DuccPropertiesResolver.ducc_runmode);
if(runmode != null) {
if(runmode.equals("Test")) {
env.put(IDuccUser.EnvironmentVariable.USER.value(), this.user);
}
}
pb.redirectOutput(new File("/dev/null"));
pb.redirectError(new File("/dev/null"));
int rc = 0;
try {
Process p = pb.start();
rc = p.waitFor();
logger.info(methodName, sset.getId(), "DuccServiceCancel returns with rc", rc);
} catch (Throwable t) {
logger.error(methodName, null, t);
}
}
class StdioListener
implements Runnable
{
InputStream in;
String tag;
boolean done = false;
int which = 0;
boolean ignore = false;
StdioListener(int which, InputStream in, boolean ignore)
{
this.in = in;
this.which = which;
switch ( which ) {
case 1: tag = "STDOUT: "; break;
case 2: tag = "STDERR: "; break;
}
this.ignore = ignore;
this.ignore = ignore;
}
StdioListener(int which, InputStream in)
{
this(which, in, false);
}
void stop()
{
this.done = true;
}
public void run()
{
long tid = Thread.currentThread().getId();
String methodName = "SvcSubmit[" + tid + "]";
BufferedReader br = new BufferedReader(new InputStreamReader(in));
while ( true ) {
try {
if ( done ) return;
String s = br.readLine();
if ( logger.isTrace() ) {
logger.trace(methodName, sset.getId(), "[]", tag, s);
}
if ( s == null ) {
String msg = tag + "closed, listener returns";
logger.info(methodName, sset.getId(), msg);
return;
}
if ( ignore ) continue; // just discarding it
switch ( which ) {
case 1:
stdout_lines.add(s);
break;
case 2:
stderr_lines.add(s);
break;
}
} catch (Exception e) {
// if anything goes wrong this guy is toast.
logger.error(methodName, sset.getId(), e);
return;
}
}
}
}
}