blob: 4dc9ed26b5a4bebafaca6edc903b4b0b7291557a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.sm;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Map;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
/**
* Represent a single instance.
*
* This is a simple class, mostly just a container for the state machine.
*/
class ServiceInstance
implements SmConstants
{
private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), COMPONENT_NAME);
long numeric_id; // unique numeric ducc-assigned id
long share_id; // RM's share ID for this instance
int instance_id = 0; // unique and constant ID assigned by SM to this instance
// which allows services to know "which" instance they are
// UIMA-4258
String host; // Where the instance is scheduled
ServiceSet sset; // handle to the service definitiopn
JobState state = JobState.Undefined; // orchestartor state
String user = null;
boolean stopped; // careful .. this means it was stopped by a stop order from somewhere,
// NOT that it's terminating
String ducc_home = System.getProperty("DUCC_HOME");
String api_classpath = ducc_home + "/lib/uima-ducc-cli.jar" + ":" + System.getProperty("java.class.path");
ServiceInstance(ServiceSet sset)
{
this.numeric_id = -1;
this.sset = sset;
this.stopped = true;
this.share_id = -1;
this.host = "<unknown>";
}
// UIMA-4258
public int getInstanceId()
{
return instance_id;
}
// UIMA-4258
public void setInstanceId(int id)
{
this.instance_id = id;
}
public long getId() {
return this.numeric_id;
}
void setId(long id) {
this.numeric_id = id;
}
public long getShareId()
{
return share_id;
}
public String getHost()
{
return host;
}
void setUser(String user)
{
this.user = user;
}
public void setState(JobState state)
{
this.state = state;
}
public JobState getState()
{
return this.state;
}
/**
* Stopped by some stop order?
*/
public synchronized boolean isStopped()
{
return this.stopped;
}
/**
* On it's way up, or already up, and not stopped for any reason.
*/
public synchronized boolean isRunning()
{
// String methodName = "setState";
// Received, // Job has been vetted, persisted, and assigned unique Id
// WaitingForDriver, // Process Manager is launching Job Driver
// WaitingForServices, // Service Manager is checking/starting services for Job
// WaitingForResources, // Scheduler is assigning resources to Job
// Initializing, // Process Agents are initializing pipelines
// Running, // At least one Process Agent has reported process initialization complete
// Completing, // Job processing is completing
// Completed, // Job processing is completed
// Undefined // None of the above
switch ( state ) {
case Completing:
case Completed:
return false;
default:
return !isStopped();
}
}
synchronized void update(long share_id, String host)
{
this.share_id = share_id;
this.host = host;
}
synchronized void setStopped(boolean s)
{
this.stopped = s;
}
// void setState(DuccWorkJob dwj)
// {
// this.state = dwj.getJobState();
// }
long start(String spec, DuccProperties meta_props)
{
String methodName = "start";
logger.info(methodName, sset.getId(), "START INSTANCE");
setStopped(false);
this.user = meta_props.getProperty("user");
// Simple use of ducc_ling, just submit as the user. The specification will have the working directory
// and classpath needed for the service, handled by the Orchestrator and Job Driver.
String[] args = {
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
"-u",
user,
"--",
System.getProperty("ducc.jvm"),
"-cp",
api_classpath,
"org.apache.uima.ducc.cli.DuccServiceSubmit",
"--specification",
spec,
"--service_id",
sset.getId().toString(),
};
for ( int i = 0; i < args.length; i++ ) {
if ( i > 0 && (args[i-1].equals("-cp") ) ) {
// The classpaths can be just awful filling the logs with junk. It will end up in the agent log
// anyway so let's inhibit it here.
logger.debug(methodName, sset.getId(), "Args[", i, "]: <CLASSPATH>");
} else {
logger.debug(methodName, sset.getId(), "Args[", i, "]:", args[i]);
}
}
ProcessBuilder pb = new ProcessBuilder(args);
Map<String, String> env = pb.environment();
env.put("DUCC_HOME", System.getProperty("DUCC_HOME"));
env.put("DUCC_SERVICE_INSTANCE", Integer.toString(instance_id)); // UIMA-4258
ArrayList<String> stdout_lines = new ArrayList<String>();
ArrayList<String> stderr_lines = new ArrayList<String>();
try {
Process p = pb.start();
int rc = p.waitFor();
logger.debug(methodName, null, "DuccServiceSubmit returns with rc", rc);
// TODO: we should attach these streams to readers in threads because too much output
// can cause blocking, deadlock, ugliness.
InputStream stdout = p.getInputStream();
InputStream stderr = p.getErrorStream();
BufferedReader stdout_reader = new BufferedReader(new InputStreamReader(stdout));
BufferedReader stderr_reader = new BufferedReader(new InputStreamReader(stderr));
String line = null;
while ( (line = stdout_reader.readLine()) != null ) {
stdout_lines.add(line);
}
line = null;
while ( (line = stderr_reader.readLine()) != null ) {
stderr_lines.add(line);
}
} catch (Throwable t) {
logger.error(methodName, sset.getId(), t);
sset.setErrorString(t.toString());
return -1;
}
for ( String s : stderr_lines ) {
logger.info(methodName, sset.getId(), "Start stderr:", s);
}
// That was annoying. Now search the lines for some hint of the id.
boolean inhibit_cp = false;
boolean started = false;
StringBuffer submit_buffer = new StringBuffer();
boolean recording = false;
for ( String s : stdout_lines ) {
// simple logic to inhibit printing the danged classpath
if ( inhibit_cp ) {
inhibit_cp = false;
logger.info(methodName, sset.getId(), "<INHIBITED CP>");
} else {
logger.info(methodName, sset.getId(), "Start stdout:", s);
}
if ( s.indexOf("-cp") >= 0 ) {
inhibit_cp = true;
}
if ( recording ) {
submit_buffer.append(s.trim());
submit_buffer.append(";");
}
if ( s.startsWith("1001 Command launching...") ) {
recording = true;
continue;
}
// e.g. Service instance 18803 submitted
if ( s.startsWith("Service") && s.endsWith("submitted") ) {
String[] toks = s.split("\\s");
try {
numeric_id = Long.parseLong(toks[2]);
started = true;
logger.info(methodName, null, "Request to start service " + sset.getId().toString() + " accepted as service instance ", numeric_id);
} catch ( NumberFormatException e ) {
sset.setErrorString("Request to start service " + sset.getId().toString() + " failed, can't interpret submit response.: " + s);
logger.warn(methodName, null, "Request to start service " + sset.getId().toString() + " failed, can't interpret response.: " + s);
}
}
}
if ( ! started ) {
logger.warn(methodName, sset.getId(), "Request to start service " + sset.getId().toString() + " failed.");
meta_props.put("submit-error", submit_buffer.toString());
sset.log_errors(stdout_lines, stderr_lines);
} else {
meta_props.remove("submit-error");
state = JobState.Received;
}
logger.info(methodName, sset.getId(), "START INSTANCE COMPLETE");
return numeric_id;
}
/**
* This assumes the caller has already verified that I'm a registered service.
*/
void stop()
{
String methodName = "stop";
setStopped(true);
String[] args = {
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
"-u",
user,
"--",
System.getProperty("ducc.jvm"),
"-cp",
api_classpath,
"org.apache.uima.ducc.cli.DuccServiceCancel",
"--id",
Long.toString(numeric_id),
};
for ( int i = 0; i < args.length; i++ ) {
if ( i > 0 && (args[i-1].equals("-cp") ) ) {
// The classpaths can be just awful filling the logs with junk. It will end up in the agent log
// anyway so let's inhibit it here.
logger.debug(methodName, sset.getId(), "Instance", numeric_id, "Args[", i, "]: <CLASSPATH>");
} else {
logger.debug(methodName, sset.getId(), "Instance", numeric_id, "Args[", i, "]:", args[i]);
}
}
ProcessBuilder pb = new ProcessBuilder(args);
Map<String, String> env = pb.environment();
env.put("DUCC_HOME", System.getProperty("DUCC_HOME"));
ArrayList<String> stdout_lines = new ArrayList<String>();
ArrayList<String> stderr_lines = new ArrayList<String>();
int rc = 0;
try {
Process p = pb.start();
rc = p.waitFor();
logger.info(methodName, sset.getId(), "DuccServiceCancel returns with rc", rc);
if (logger.isTrace() || (rc != 0)) {
InputStream stdout = p.getInputStream();
InputStream stderr = p.getErrorStream();
BufferedReader stdout_reader = new BufferedReader(new InputStreamReader(stdout));
BufferedReader stderr_reader = new BufferedReader(new InputStreamReader(stderr));
String line = null;
while ( (line = stdout_reader.readLine()) != null ) {
stdout_lines.add(line);
}
line = null;
while ( (line = stderr_reader.readLine()) != null ) {
stderr_lines.add(line);
}
}
} catch (Throwable t) {
// TODO Auto-generated catch block
logger.error(methodName, null, t);
}
if ( logger.isTrace() || ( rc != 0) ) {
boolean inhibit_cp = false;
for ( String s : stdout_lines ) {
// simple logic to inhibit printing the danged classpath
if ( inhibit_cp ) {
inhibit_cp = false;
logger.info(methodName, sset.getId(), "Instance", numeric_id, "<INHIBITED CP>");
} else {
logger.info(methodName, sset.getId(), "Instance", numeric_id, "Stop stdout:", s);
}
if ( s.indexOf("-cp") >= 0 ) {
inhibit_cp = true;
}
}
for ( String s : stderr_lines ) {
logger.info(methodName, sset.getId(), "Instance", numeric_id, "Stop stderr:", s);
}
}
}
}