blob: 4feeb3ad64de18ee44a471ae000c12eff23d5430 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.sm;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import org.apache.uima.ducc.cli.AServicePing;
import org.apache.uima.ducc.cli.UimaAsPing;
import org.apache.uima.ducc.common.IServiceStatistics;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
/**
* This runs the watchdog thread for custom service pingers.
*
* It spawns a process, as the user, which in turn will instantiate an object which extends
* AServiceMeta to implement the pinger.
*
* The processes communicate via a pipe: every ping interval the meta puts relevent information onto its
* stdout:
* 0|1 long long
* The first token is 1 if the ping succeeded, 0 otherwise.
* The second token is the total cumulative work executed by the service.
* The third token is the current queue depth of the service.
*/
class PingDriver
implements IServiceMeta, // (extends runnable )
SmConstants
{
/**
*
*/
private static final long serialVersionUID = 1L;
private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), COMPONENT_NAME);
String[] jvm_args;
String endpoint;
String ping_class;
String ping_arguments;
String classpath;
boolean ping_ok;
int missed_pings = 0; // didn't ping in specified time, but no error thrown
int errors = 0; // error, no good
int error_threshold = 5; // max errors before we die
ServiceSet sset;
boolean test_mode = false;
Process ping_main;
StdioListener sin_listener = null;
StdioListener ser_listener = null;
PingThread pinger = null;
int meta_ping_rate; // ducc.properties configured ping rate
int meta_ping_stability; // ducc.properties number of missed pings before setting service unresponive
String meta_ping_timeout; // job.properties configured time to wait for ping to return in ms
Thread ping_thread; // thread to manage external process pingers
boolean internal_ping = true; // if true, use default UIMA-AS pinger in thread inside SM propert
AServicePing internal_pinger = null; // pinger used if internal_ping is true
IServiceStatistics service_statistics = null;
String user;
String working_directory;
String log_directory;
boolean do_log = true;
boolean shutdown = false;
PingDriver(ServiceSet sset)
{
this.sset = sset;
DuccProperties job_props = sset.getJobProperties();
DuccProperties meta_props = sset.getMetaProperties();
this.endpoint = meta_props.getStringProperty("endpoint");
this.user = meta_props.getStringProperty("user");
String jvm_args_str = job_props.getStringProperty("service_ping_jvm_args", "");
this.ping_class = job_props.getStringProperty("service_ping_class", null);
this.ping_arguments = job_props.getStringProperty("service_ping_arguments", null);
if ( (ping_class == null) || ping_class.equals(UimaAsPing.class.getName()) ) {
internal_ping = true;
} else {
internal_ping = false;
this.meta_ping_timeout = job_props.getStringProperty("service_ping_timeout");
this.do_log = job_props.getBooleanProperty("service_ping_dolog", true);
this.classpath = job_props.getStringProperty("service_ping_classpath", System.getProperty("java.class.path"));
this.working_directory = job_props.getStringProperty("working_directory");
this.log_directory = job_props.getStringProperty("log_directory");
}
jvm_args_str = jvm_args_str + " -Dducc.sm.meta.ping.timeout=" + meta_ping_timeout;
jvm_args_str = jvm_args_str.trim();
jvm_args = jvm_args_str.split("\\s+");
this.meta_ping_rate = ServiceManagerComponent.meta_ping_rate;
this.meta_ping_stability = ServiceManagerComponent.meta_ping_stability;
}
/**
* Test from main only
*/
PingDriver(String props)
{
DuccProperties dp = new DuccProperties();
try {
dp.load(props);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.endpoint = dp.getStringProperty("endpoint");
String jvm_args_str = dp.getStringProperty("service_ping_jvm_args", "");
this.ping_class = dp.getStringProperty("service_ping_class");
this.classpath = dp.getStringProperty("service_ping_classpath");
jvm_args = jvm_args_str.split(" ");
this.test_mode = true;
}
public IServiceStatistics getServiceStatistics()
{
return service_statistics;
}
synchronized int getMetaPingRate()
{
return meta_ping_rate;
}
public void run()
{
String methodName = "run";
if ( internal_ping ) {
// This is the default ping driver, as configured in ducc.propeties, to be run in
// an in-process thread
logger.info(methodName, sset.getId(), "Starting INTERNAL ping.");
runAsThread();
logger.info(methodName, sset.getId(), "Ending INTERNAL ping.");
} else {
// The user specified a pinger, run it as an extranal process under that user's identity
logger.info(methodName, sset.getId(), "Starting EXTERNAL ping.");
runAsProcess();
logger.info(methodName, sset.getId(), "Ending EXTERNAL ping.");
}
}
void handleStatistics(IServiceStatistics stats)
{
String methodName = "handleStatistics";
this.service_statistics = stats;
if ( stats == null ) {
logger.error(methodName, sset.getId(), "Service statics are null!");
errors++;
} else {
if ( service_statistics.isAlive() ) {
synchronized(this) {
sset.setResponsive();
}
logger.info(methodName, sset.getId(), "Ping ok: ", endpoint, stats.toString());
missed_pings = 0;
} else {
logger.error(methodName, sset.getId(), "Missed_pings ", ++missed_pings, "endpoint", endpoint, stats.toString());
if ( missed_pings > meta_ping_stability ) {
sset.setUnresponsive();
logger.info(methodName, sset.getId(), "Seting state to unresponsive, endpoint",endpoint);
} else if ( missed_pings > (meta_ping_stability / 2) ) {
sset.setWaiting();
logger.info(methodName, sset.getId(), "Seting state to waiting, endpoint,", endpoint);
}
}
}
}
public void runAsThread()
{
String methodName = "runAsThread";
internal_pinger = new UimaAsPing(logger);
try {
internal_pinger.init(ping_arguments, endpoint);
} catch ( Throwable t ) {
logger.warn(methodName, sset.getId(), t);
sset.pingExited();
}
while ( ! shutdown ) {
handleStatistics(internal_pinger.getStatistics());
if ( errors > error_threshold ) {
internal_pinger.stop();
logger.warn(methodName, sset.getId(), "Ping exited because of excess errors: ", errors);
sset.pingExited();
}
try {
Thread.sleep(meta_ping_rate);
} catch (InterruptedException e) {
// nothing, if we were shutdown we'll exit anyway, otherwise who cares
}
}
}
public void runAsProcess()
{
String methodName = "run";
try {
pinger = new PingThread();
} catch ( Throwable t ) {
logger.error(methodName, sset.getId(), "Cannot start listen socket, pinger not started.", t);
sset.setUnresponsive();
return;
}
int port = pinger.getPort();
ping_thread = new Thread(pinger);
ping_thread.start(); // sets up the listener, before we start the the external process
ArrayList<String> arglist = new ArrayList<String>();
if ( ! test_mode ) {
arglist.add(System.getProperty("ducc.agent.launcher.ducc_spawn_path"));
arglist.add("-u");
arglist.add(user);
arglist.add("-w");
arglist.add(working_directory);
if ( do_log ) {
arglist.add("-f");
arglist.add(log_directory + "/services/ping/" + sset.getId());
}
arglist.add("--");
}
arglist.add(System.getProperty("ducc.jvm"));
for ( String s : jvm_args) {
arglist.add(s);
}
arglist.add("-cp");
arglist.add(System.getProperty("java.class.path") + ":" + classpath);
arglist.add("org.apache.uima.ducc.sm.ServicePingMain");
arglist.add("--class");
arglist.add(ping_class);
arglist.add("--endpoint");
arglist.add(endpoint);
arglist.add("--port");
if( ping_arguments != null ) {
arglist.add("--arguments");
arglist.add(ping_arguments);
}
arglist.add(Integer.toString(port));
int i = 0;
for ( String s : arglist) {
logger.debug(methodName, sset.getId(), "Args[", i++,"]: ", s);
}
ProcessBuilder pb = new ProcessBuilder(arglist);
//
// Establish our pinger
//
InputStream stdout = null;
InputStream stderr = null;
try {
ping_main = pb.start();
stdout = ping_main.getInputStream();
stderr = ping_main.getErrorStream();
sin_listener = new StdioListener(1, stdout);
ser_listener = new StdioListener(2, stderr);
Thread sol = new Thread(sin_listener);
Thread sel = new Thread(ser_listener);
sol.start();
sel.start();
} catch (Throwable t) {
logger.error(methodName, sset.getId(), "Cannot establish ping process:", t);
sset.setUnresponsive();
return;
}
int rc;
while ( true ) {
try {
rc = ping_main.waitFor();
logger.debug(methodName, sset.getId(), "Pinger returns rc ", rc);
sset.pingExited();
break;
} catch (InterruptedException e2) {
// nothing
}
}
pinger.stop();
sin_listener.stop();
ser_listener.stop();
}
public void stop()
{
shutdown = true;
if ( !internal_ping ) {
if ( pinger != null ) pinger.stop();
if ( sin_listener != null ) sin_listener.stop();
if ( ser_listener != null ) ser_listener.stop();
if ( ping_main != null ) ping_main.destroy();
}
}
class PingThread
implements Runnable
{
ServerSocket server;
int port = -1;
boolean done = false;
PingThread()
throws IOException
{
this.server = new ServerSocket(0);
this.port = server.getLocalPort();
}
int getPort()
{
return this.port;
}
synchronized void stop()
{
this.done = true;
}
public void run()
{
String methodName = "PingThread.run()";
try {
Socket sock = server.accept();
// Socket sock = new Socket("localhost", port);
sock.setSoTimeout(5000);
OutputStream out = sock.getOutputStream();
InputStream in = sock.getInputStream();
ObjectInputStream ois = new ObjectInputStream(in);
ping_ok = false; // we expect the callback to change this
while ( true ) {
synchronized(this) {
if ( done ) {
// Ask for the ping
try {
logger.trace(methodName, sset.getId(), "PingDriver: ping QUIT");
out.write('Q');
out.flush();
} catch (IOException e1) {
logger.error(methodName, sset.getId(), e1);
errors++;
}
ois.close();
out.close();
in.close();
return;
}
}
if ( errors > error_threshold ) {
stop();
}
// Ask for the ping
try {
logger.trace(methodName, sset.getId(), "PingDriver: ping OUT");
out.write('P');
out.flush();
} catch (IOException e1) {
logger.error(methodName, sset.getId(), e1);
errors++;
}
// Try to read the response
handleStatistics((IServiceStatistics) ois.readObject());
// Wait a bit for the next one
try {
// logger.info(methodName, sset.getId(), "SLEEPING", my_ping_rate, "ms", sset.toString());
Thread.sleep(meta_ping_rate);
// logger.info(methodName, sset.getId(), "SLEEP returns", sset.toString());
} catch (InterruptedException e) {
// nothing
}
}
} catch (IOException e) {
logger.error(methodName, sset.getId(), "Error receiving ping", e);
errors++;
} catch (ClassNotFoundException e) {
logger.error(methodName, sset.getId(), "Input garbled:", e);
errors++;
}
}
}
class StdioListener
implements Runnable
{
InputStream in;
String tag;
boolean done = false;
StdioListener(int which, InputStream in)
{
this.in = in;
switch ( which ) {
case 1: tag = "STDOUT: "; break;
case 2: tag = "STDERR: "; break;
}
}
void stop()
{
this.done = true;
}
public void run()
{
if ( done ) return;
String methodName = "StdioListener.run";
BufferedReader br = new BufferedReader(new InputStreamReader(in));
while ( true ) {
try {
String s = br.readLine();
if ( test_mode ) System.out.println(tag + s);
else logger.info(methodName, sset.getId(), tag, s);
if ( s == null ) {
String msg = tag + "closed, listener returns";
if ( test_mode ) System.out.println(msg);
else logger.info(methodName, sset.getId(), msg);
return;
}
} catch (IOException e) {
// if anything goes wrong this guy is toast.
if ( test_mode) e.printStackTrace();
else logger.error(methodName, sset.getId(), e);
return;
}
}
}
}
public static void main(String[] args)
{
// arg0 = amqurl = put into -Dbroker.url
// arg1 = endpoint - pass to ServicePingMain
// call ServicePingMain --class org.apache.uima.ducc.sm.PingTester --endpoint FixedSleepAE_1
// make sure test.jar is in the classpath
PingDriver csm = new PingDriver(args[0]);
csm.run();
}
}