blob: 77249a5bb0348c56793fc3c8d9aa9d2941b41a6f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.sm;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.uima.ducc.cli.AServicePing;
import org.apache.uima.ducc.cli.IUiOptions.UiOption;
import org.apache.uima.ducc.common.IServiceStatistics;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
import org.apache.uima.ducc.common.utils.QuotedOptions;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.transport.event.sm.IService.ServiceState;
/**
* This runs the watchdog thread for custom service pingers.
*
* It spawns a process, as the user, which in turn will instantiate an object which extends
* AServiceMeta to implement the pinger.
*
* The processes communicate via a pipe: every ping interval the meta puts relevent information onto its
* stdout:
* 0|1 long long
* The first token is 1 if the ping succeeded, 0 otherwise.
* The second token is the total cumulative work executed by the service.
* The third token is the current queue depth of the service.
*/
class PingDriver
implements IServiceMeta, // (extends runnable )
SmConstants
{
/**
*
*/
private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), COMPONENT_NAME);
String[] jvm_args;
String endpoint;
String ping_class;
String ping_arguments;
String classpath;
boolean ping_ok;
int max_instances;
int missed_pings = 0; // didn't ping in specified time, but no error thrown
int errors = 0; // error, no good
int error_threshold = 5; // max errors before we die
ServiceSet sset;
boolean test_mode = false;
Process ping_main;
StdioListener sin_listener = null;
StdioListener ser_listener = null;
PingThread pinger = null;
int meta_ping_rate; // ducc.properties configured ping rate
int meta_ping_stability; // ducc.properties number of missed pings before setting service unresponive
long meta_ping_timeout; // how long we wait for pinger to return when requesing a ping
Thread ping_thread; // thread to manage external process pingers
boolean internal_ping = true; // if true, use default UIMA-AS pinger in thread inside SM propert
int failure_max;
int failure_window;
long service_statistics_timestamp = -1;
IServiceStatistics service_statistics = null;
String user;
String working_directory;
String log_directory;
String environment;
boolean do_log = true;
boolean shutdown = false;
PingStopper pingStopper = null;
Timer timer = null;
ServiceState pingState = ServiceState.Waiting;
DuccProperties meta_props;
PingDriver(ServiceSet sset)
{
this.sset = sset;
DuccProperties job_props = sset.getJobProperties();
meta_props = sset.getMetaProperties();
// establish the default pinger, then see if another pinger is specified and set it.
this.ping_class = System.getProperty("ducc.sm.default.monitor.class", "org.apache.uima.ducc.cli.UimaAsPing");
this.ping_class = job_props.getStringProperty(UiOption.ServicePingClass.pname(), this.ping_class);
// If the pinger is registered with us we can pick up (and trust) the registered defaults. Read the registration now.
DuccProperties ping_props = findRegisteredPinger(this.ping_class);
if ( ping_props == null ) { // this is an internal or system error of some sort
throw new IllegalStateException("Cannot start pinger.");
} else {
this.internal_ping = ping_props.getBooleanProperty("internal", false);
// One more resolution, in case the class name is not actually the name of a registered pinger
String real_class = ping_props.getProperty("service_ping_class");
if ( real_class != null ) {
this.ping_class = real_class;
}
logger.info("<ctr>", sset.getId(), "Using ping class", this.ping_class);
}
this.endpoint = meta_props.getStringProperty("endpoint");
this.user = meta_props.getStringProperty("user");
this.max_instances = Integer.parseInt(System.getProperty("ducc.sm.max.instances", "10"));
this.ping_arguments = resolveStringProperty (UiOption.ServicePingArguments.pname() , ping_props, job_props, null);
String jvm_args_str = resolveStringProperty (UiOption.ServicePingJvmArgs.pname() , ping_props, job_props, "");
this.meta_ping_timeout = resolveIntProperty (UiOption.ServicePingTimeout.pname() , ping_props, job_props, ServiceManagerComponent.meta_ping_timeout);
this.do_log = resolveBooleanProperty(UiOption.ServicePingDoLog.pname() , ping_props, job_props, false);
this.classpath = resolveStringProperty (UiOption.ServicePingClasspath.pname() , ping_props, job_props, System.getProperty("java.class.path"));
this.working_directory = resolveStringProperty (UiOption.WorkingDirectory.pname() , ping_props, job_props, null); // cli always puts this into job props, no default
this.log_directory = resolveStringProperty (UiOption.LogDirectory.pname() , ping_props, job_props, null); // cli always puts this into job props, no default
this.failure_window = resolveIntProperty (UiOption.InstanceFailureWindow.pname(), ping_props, job_props, ServiceManagerComponent.failure_window);
this.failure_max = resolveIntProperty (UiOption.InstanceFailureLimit.pname( ), ping_props, job_props, ServiceManagerComponent.failure_max);
environment = resolveStringProperty (UiOption.Environment.pname( ), ping_props, job_props, null);
jvm_args_str = jvm_args_str.trim();
if ( jvm_args_str.equals("") ) {
jvm_args = null;
} else {
jvm_args = jvm_args_str.split("\\s+");
}
// global, not customizable per-pinger
this.meta_ping_rate = ServiceManagerComponent.meta_ping_rate;
this.meta_ping_stability = ServiceManagerComponent.meta_ping_stability;
}
//
// If the class is registered, read it into ducc properties and return it. Else return an
// empty ducc properties. The resolver will deal with the emptiness.
//
protected DuccProperties findRegisteredPinger(String cls)
{
String methodName = "find RegisteredPinger";
DuccProperties answer = new DuccProperties();
File f = new File(System.getProperty("DUCC_HOME") + "/resources/service_monitors/" + cls);
if ( f.exists () ) {
try {
answer.load(f.getCanonicalPath());
logger.info(methodName, sset.getId(), "Loading site-registered service monitor from", cls);
} catch (Exception e) {
logger.error(methodName, sset.getId(), "Cannot load site-registered service monitor", f.getName(), e);
return null;
}
}
return answer;
}
// this resolves the prop in either of the two props files and expands ${} against System props, which include
// everything in ducc.properties
protected String resolveStringProperty(String prop, DuccProperties ping_props, DuccProperties job_props, String deflt)
{
if ( internal_ping ) {
// internal ping only gets to adjust the ping tuning parameters
if ( ! prop.equals("service_ping_arguments") ) return ping_props.getStringProperty(prop, deflt);
}
prop = prop.trim();
//
// Search order: first, what is registered to the service,
// second, what is registered to the site-registered pinger
// third, the passed-in default
//
String val = job_props.getProperty(prop);
if ( val == null ) {
val = ping_props.getProperty(prop);
}
if ( val == null ) {
val = deflt;
}
if ( val != null ) val = val.trim();
return val;
}
protected int resolveIntProperty(String prop, DuccProperties ping_props, DuccProperties job_props, int deflt)
{
String val = resolveStringProperty(prop, ping_props, job_props, null);
return (val == null ? deflt : Integer.parseInt(val));
}
protected boolean resolveBooleanProperty(String prop, DuccProperties ping_props, DuccProperties job_props, boolean deflt)
{
String val = resolveStringProperty(prop, ping_props, job_props, Boolean.toString(deflt));
return ( val.equalsIgnoreCase("True") ||
val.equalsIgnoreCase("true") );
}
// /**
// * Test from main only
// */
// PingDriver(String props)
// {
// DuccProperties dp = new DuccProperties();
// try {
// dp.load(props);
// } catch (Exception e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// this.endpoint = dp.getStringProperty("endpoint");
// String jvm_args_str = dp.getStringProperty("service_ping_jvm_args", "");
// this.ping_class = dp.getStringProperty("service_ping_class");
// this.classpath = dp.getStringProperty("service_ping_classpath");
// jvm_args = jvm_args_str.split(" ");
// this.test_mode = true;
// }
/**
* Used by the ServiceSet state machine.
*/
public ServiceState getServiceState()
{
return this.pingState;
}
/**
* Used by the ServiceSet state machine for messages
*/
public long getId()
{
return 0;
}
public JobState getState()
{
String methodName = "getState";
switch ( pingState ) {
case Available:
return JobState.Running;
case Stopped:
return JobState.Completed;
case Waiting:
return JobState.Initializing; // not really, but not used. don't have or need a better alternative.
default:
logger.error(methodName, sset.getId(), "Unexpected state in Ping driver:", pingState);
return JobState.Completed;
}
}
public void setState(JobState s)
{
// nothing
}
public long getServiceStatisticsTimestamp()
{
return service_statistics_timestamp;
}
public IServiceStatistics getServiceStatistics()
{
return service_statistics;
}
synchronized int getMetaPingRate()
{
return meta_ping_rate;
}
public void run()
{
String methodName = "run";
if ( internal_ping ) {
// This is the default ping driver, as configured in ducc.propeties, to be run in
// an in-process thread
logger.info(methodName, sset.getId(), "Starting INTERNAL ping.");
runAsThread();
logger.info(methodName, sset.getId(), "Ending INTERNAL ping.");
} else {
// The user specified a pinger, run it as an extranal process under that user's identity
logger.info(methodName, sset.getId(), "Starting EXTERNAL ping.");
runAsProcess();
logger.info(methodName, sset.getId(), "Ending EXTERNAL ping.");
}
}
void handleResponse(Pong response)
{
String methodName = "handleStatistics";
// ***** ERROR INJECTION *****
String key = "ducc.sm.meta.ping.error.injection.missing.percentage";
String value = DuccPropertiesResolver.getInstance().getFileProperty(key);
long missingPercentage = 0;
try {
missingPercentage = Long.parseLong(value);
}
catch(Exception e) {
logger.trace(methodName, sset.getId(), e);
}
logger.trace(methodName, sset.getId(), key+"="+missingPercentage);
if(missingPercentage > 0) {
if(missingPercentage < 100) {
Random random = new Random();
int n = random.nextInt(100);
if(n < missingPercentage) {
logger.warn(methodName, sset.getId(), "skip pinger data");
return;
}
logger.warn(methodName, sset.getId(), "keep pinger data");
}
}
// ***** ERROR INJECTION *****
this.service_statistics_timestamp = response.getTimestamp();
this.service_statistics = response.getStatistics();
if ( service_statistics == null ) {
logger.error(methodName, sset.getId(), "Service statics are null!");
errors++;
return; // always a pinger error, don't let pinger affect anything
} else {
if ( service_statistics.isAlive() ) {
pingState = ServiceState.Available;
logger.info(methodName, sset.getId(), "Ping ok: ", endpoint, service_statistics.toString());
missed_pings = 0;
} else {
logger.error(methodName, sset.getId(), "Missed_pings ", ++missed_pings, "endpoint", endpoint, service_statistics.toString());
if ( missed_pings > meta_ping_stability ) {
pingState = ServiceState.Waiting;
}
}
}
// maybe it was turned off
sset.setAutostart( response.isAutostart() );
// when was the service last used?
sset.setLastUse( response.getLastUse() );
//
// Must cap additions and deletions at some reasonable value in case the monitor is too agressive or in error.
// -- additions capped at global installation max from ducc.sm.instance.max
// -- deletions capped at registered value IFF the service is autostarted
//
int additions = response.getAdditions();
int instances = sset.countImplementors();
if ( additions + instances > max_instances ) {
additions = Math.max(0, max_instances - instances);
logger.warn(methodName, sset.getId(), "Maximum services instances capped by installation limit of", max_instances, "at", additions);
}
Long[] deletions = response.getDeletions();
int ndeletions = 0;
if ( deletions != null ) {
ndeletions = deletions.length;
//UIMA-4995
// if ( sset.isAutostart() && (ndeletions > 0) ) {
// int reg_instances = meta_props.getIntProperty("instances", 1);
// if ( instances - ndeletions < reg_instances ) {
// ndeletions = Math.max(0, instances - reg_instances);
// logger.warn(methodName, sset.getId(), "Service shrink value capped by registered min of", reg_instances, "at", ndeletions);
// }
// }
// int refs = sset.countReferences();
// int impls = sset.countImplementors();
// if ( (impls <= ndeletions) && (refs > 0) ) {
// ndeletions = Math.max(0, impls - 1);
// logger.warn(methodName, sset.getId(), "Service shrink value capped at", ndeletions, "because there are still", refs, "references.");
// }
}
//TODO safe against invalid or repeated deletion IDs?
sset.signalRebalance(additions, deletions, ndeletions, response.isExcessiveFailures());
}
void expand_wildcards(List<URL> in, String cp_entry)
throws MalformedURLException
{
int ndx = cp_entry.lastIndexOf("/");
File dir = new File(cp_entry.substring(0, ndx));
if ( ! dir.exists() ) {
return;
}
File[] files = dir.listFiles();
if ( files == null || files.length == 0 ) {
return;
}
for ( File f : files ) {
if ( f.isFile() ) {
in.add(new URL("file://" + f.getPath()));
}
}
}
AServicePing loadInternalMonitor()
throws ClassNotFoundException,
IllegalAccessException,
InstantiationException,
MalformedURLException
{
String methodName = "loadInternalMonitor";
if ( classpath == null ) {
@SuppressWarnings("unchecked")
Class<AServicePing> cl = (Class<AServicePing>) Class.forName(ping_class);
return (AServicePing) cl.newInstance();
} else {
String[] cp_elems = classpath.split(":");
List<URL> cp_urls = new ArrayList<URL>();
for ( int i = 0; i < cp_elems.length; i++ ) {
if ( cp_elems[i].endsWith("*") ) {
expand_wildcards(cp_urls, cp_elems[i]);
} else {
cp_urls.add(new URL("file://" + cp_elems[i]));
}
}
if ( logger.isTrace () ) {
logger.trace(methodName, sset.getId(), "Loading internally with classpath:");
for ( URL u : cp_urls ) {
logger.trace(methodName, sset.getId(), " ", u.toString());
}
}
@SuppressWarnings("resource")
URLClassLoader l = new URLClassLoader(cp_urls.toArray(new URL[cp_urls.size()]));
@SuppressWarnings("rawtypes")
Class loaded_class = l.loadClass(ping_class);
l = null;
return (AServicePing) loaded_class.newInstance();
}
}
/**
* Process the initialization properties in two forms:
* a) into a map for use by internal pingers which won't have to parse anything.
* b) into a serialized properties string to pass as an argument to the ping main
* for external pingers.
*/
String setCommonInitProperties(Map<String, Object> props)
{
props.put("monitor-rate" , meta_ping_rate);
props.put("service-id" , sset.getId().getFriendly());
props.put("failure-max" , failure_max);
props.put("failure-window" , failure_window);
props.put("do-log" , do_log);
props.put("autostart-enabled" , sset.isAutostart());
props.put("last-use" , sset.getLastUse());
StringBuffer buf = new StringBuffer();
buf.append("monitor-rate=" ); buf.append(Integer.toString (meta_ping_rate)); buf.append(",");
buf.append("service-id=" ); buf.append(Long.toString (sset.getId().getFriendly())); buf.append(",");
buf.append("failure-max=" ); buf.append(Integer.toString (failure_max)); buf.append(",");
buf.append("failure-window=" ); buf.append(Integer.toString (failure_window)); buf.append(",");
buf.append("do-log=" ); buf.append(Boolean.toString (do_log)); buf.append(",");
buf.append("autostart-enabled="); buf.append(Boolean.toString (sset.isAutostart())); buf.append(",");
buf.append("last-use=" ); buf.append(Long.toString (sset.getLastUse()));
return buf.toString();
}
/**
* Common per-ping data: anything that can change since the last ping.
*/
void setCommonProperties(Map<String, Object> props)
{
Long[] instances = sset.getImplementors();
props.put("all-instances" , instances);
props.put("registered-instances", sset.getNInstancesRegistered());
String[] hosts = new String[instances.length];
Long[] shares = new Long[instances.length];
for ( int i = 0; i < instances.length; i++ ) {
hosts[i] = sset.getHostFor(instances[i]);
shares[i] = sset.getShareFor(instances[i]);
}
props.put("all-hosts" , hosts);
props.put("all-shares" , shares);
props.put("active-instances" , sset.getActiveInstances());
props.put("autostart-enabled", sset.isAutostart());
DuccId[] references = sset.getReferences();
Long[] refs = new Long[references.length];
for ( int i = 0; i < refs.length; i++ ) {
refs[i] = references[i].getFriendly();
}
props.put("references" , refs);
props.put("run-failures" , sset.getRunFailures());
}
void runAsThread()
{
long tid = Thread.currentThread().getId();
String methodName = "runAsThread[" + tid + "]";
AServicePing pinger = null;
Map<String, Object> initProps = new HashMap<String, Object>();
Map<String, Object> props = new HashMap<String, Object>();
try {
pinger = loadInternalMonitor();
} catch (ClassNotFoundException e1) {
logger.error(methodName, sset.getId(), "Cannot load pinger: ClassNotFoundException(", ping_class, ")");
return;
} catch (IllegalAccessException e1) {
logger.error(methodName, sset.getId(), "Cannot load pinger: IllegalAccessException(", ping_class, ")");
return;
} catch (InstantiationException e1) {
logger.error(methodName, sset.getId(), "Cannot load pinger: InstantiationException(", ping_class, ")");
return;
} catch ( MalformedURLException e1) {
logger.error(methodName, sset.getId(), "Cannot load pinger: Cannot form URLs from classpath entries(", ping_class, ")");
return;
} catch ( Throwable t ) {
logger.error(methodName, sset.getId(), "Cannot load pinger for unknown reason:", ping_class, t);
return;
}
try {
setCommonInitProperties(initProps);
pinger.setLogger(logger);
pinger.init(ping_arguments, endpoint, initProps);
while ( ! shutdown ) {
setCommonProperties(props);
pinger.setSmState(props);
Pong pr = new Pong();
pr.setStatistics ( pinger.getStatistics() );
pr.setAdditions ( pinger.getAdditions() );
pr.setDeletions ( pinger.getDeletions() );
pr.setExcessiveFailures( pinger.isExcessiveFailures() );
pr.setAutostart ( pinger.isAutostart() );
pr.setLastUse ( pinger.getLastUse() );
handleResponse(pr);
if ( errors > error_threshold ) {
pinger.stop();
logger.warn(methodName, sset.getId(), "Ping exited because of excess errors: ", errors);
break;
}
try {
Thread.sleep(meta_ping_rate);
} catch (InterruptedException e) {
// nothing, if we were shutdown we'll exit anyway, otherwise who cares
}
}
} catch ( Throwable t ) {
logger.warn(methodName, sset.getId(), t);
}
pinger = null;
sset.pingExited(errors, this);
}
public void runAsProcess()
{
long tid = Thread.currentThread().getId();
String methodName = "runAsProcess[" + tid + "]";
String cp = classpath;
String dh = System.getProperty("DUCC_HOME");
// We need the sm jar and the cli jar. The cli jar is the one with the manifest to insure
// the common and transport stuff is picked. up. We have to dig around for the right sm
// jar because of versioning on the jar names.
cp = cp + ":" + dh + "/lib/uima-ducc-cli.jar";
File libdir = new File(dh + "/lib/uima-ducc");
String[] jars = libdir.list();
if ( jars != null ) {
for ( String j : jars ) {
if ( j.contains("ducc-sm") ) {
cp = cp + ":" + dh + "/lib/uima-ducc/" + j;
continue;
}
if ( j.contains("ducc-common") ) {
cp = cp + ":" + dh + "/lib/uima-ducc/" + j;
continue;
}
}
}
try {
pinger = new PingThread();
} catch ( Throwable t ) {
logger.error(methodName, sset.getId(), "Cannot start listen socket, pinger not started.", t);
pingState = ServiceState.Stopped;
return;
}
int port = pinger.getPort();
ping_thread = new Thread(pinger);
ping_thread.setName("XTrnPingMonitor-"+ sset.getId());
ping_thread.start(); // sets up the listener, before we start the the external process
Map<String, Object> initProps = new HashMap<String, Object>();
String serprops = setCommonInitProperties(initProps);
ArrayList<String> arglist = new ArrayList<String>();
if ( ! test_mode ) {
arglist.add(System.getProperty("ducc.agent.launcher.ducc_spawn_path"));
arglist.add("-u");
arglist.add(user);
arglist.add("-w");
arglist.add(working_directory);
if ( do_log ) {
arglist.add("-f");
arglist.add(log_directory + "/services/ping/" + sset.getId());
}
arglist.add("--");
}
// Jira 4805 - run the pinger with the same environment as the service
// Tokenize & unquote the assignments, and convert to a map of environment settings after any substitutions
// Suntax errors usually caught when registered, but --modify does not check
ArrayList<String> envVarList = QuotedOptions.tokenizeList(environment, true);
Map<String, String> envMap;
try {
envMap = QuotedOptions.parseAssignments(envVarList, +1);
} catch (IllegalArgumentException e) {
logger.error(methodName, sset.getId(), "Invalid environment:", e);
pingState = ServiceState.Stopped;
return;
}
// Jira 5002 - check for user-specified JVM
String javaHome = envMap.get("JAVA_HOME");
if (javaHome != null) {
arglist.add(javaHome + "/bin/java");
} else {
arglist.add(System.getProperty("ducc.jvm"));
}
arglist.add("-DSM_MONITOR=T");
if ( jvm_args != null ) {
for ( String s : jvm_args) {
arglist.add(s);
}
}
arglist.add("-cp");
arglist.add(cp);
//arglist.add("-Xmx100M");
arglist.add("-Dcom.sun.management.jmxremote");
arglist.add("org.apache.uima.ducc.sm.ServicePingMain");
arglist.add("--class");
arglist.add(ping_class);
arglist.add("--endpoint");
arglist.add(endpoint);
arglist.add("--port");
arglist.add(Integer.toString(port));
arglist.add("--initprops");
arglist.add(serprops);
if( ping_arguments != null ) {
arglist.add("--arguments");
arglist.add(ping_arguments);
}
int i = 0;
for ( String s : arglist) {
logger.debug(methodName, sset.getId(), "Args[", i++,"]: ", s);
}
ProcessBuilder pb = new ProcessBuilder(arglist);
//
// Establish our pinger
//
InputStream stdout = null;
InputStream stderr = null;
try {
Map<String, String> env = pb.environment();
env.clear();
env.putAll(envMap);
ping_main = pb.start();
stdout = ping_main.getInputStream();
stderr = ping_main.getErrorStream();
sin_listener = new StdioListener(1, stdout);
ser_listener = new StdioListener(2, stderr);
Thread sol = new Thread(sin_listener);
Thread sel = new Thread(ser_listener);
sol.start();
sel.start();
} catch (Throwable t) {
logger.error(methodName, sset.getId(), "Cannot establish ping process:", t);
pingState = ServiceState.Stopped;
return;
}
int rc;
while ( true ) {
try {
rc = ping_main.waitFor();
ping_main = null;
if ( pingStopper != null ) {
pingStopper.cancel();
pingStopper = null;
logger.info(methodName, sset.getId(), "Pinger returned, pingStopper is canceled.");
}
logger.info(methodName, sset.getId(), "Pinger returns rc ", rc);
sset.pingExited(rc, this);
break;
} catch (InterruptedException e2) {
// nothing
}
}
// pinger.stop();
sin_listener.stop();
ser_listener.stop();
}
public void stop()
{
shutdown = true;
if ( !internal_ping ) {
if ( pinger != null ) pinger.stop();
pingStopper = new PingStopper();
if ( timer == null ) {
timer = new Timer();
}
timer.schedule(pingStopper, 60000);
}
}
private class PingStopper
extends TimerTask
{
PingStopper()
{
String methodName = "PingStopper.init";
logger.debug(methodName, sset.getId(), "Wait for pinger to exit:", 60000);
}
public void run()
{
String methodName = "PingStopper.run";
logger.debug(methodName, sset.getId(), "PingStopper kills reluctant pinger");
if ( ping_main != null ) ping_main.destroy();
}
}
class PingThread
implements Runnable
{
ServerSocket server;
int port = -1;
boolean done = false;
PingThread()
throws IOException
{
this.server = new ServerSocket(0);
this.port = server.getLocalPort();
}
int getPort()
{
return this.port;
}
synchronized void stop()
{
String methodName = "stop";
logger.info(methodName, sset.getId(), "Pinger stopping: set done = true");
this.done = true;
}
public void run()
{
long tid = Thread.currentThread().getId();
String methodName = "XtrnPingThread.run[" + tid + "]";
try {
Socket sock = server.accept();
// Socket sock = new Socket("localhost", port);
sock.setSoTimeout(meta_ping_rate); // don't timout faster than ping rate
OutputStream outs = sock.getOutputStream();
InputStream in = sock.getInputStream();
ObjectInputStream ois = new ObjectInputStream(in);
ObjectOutputStream oos = new ObjectOutputStream(outs);
Map<String, Object>props = new HashMap<String, Object>();
ping_ok = false; // we expect the callback to change this
while ( true ) {
synchronized(this) {
if ( done ) {
// Ask for the ping
try {
logger.info(methodName, sset.getId(), "ExtrnPingDriver: send QUIT to pinger.");
oos.writeObject(new Ping(true, props));
oos.flush();
//oos.reset();
} catch (IOException e1) {
logger.error(methodName, sset.getId(), e1);
}
logger.info(methodName, sset.getId(), "ExtrnPingDriver: QUIT is sent and flushed; thread exits.");
// gc will close all the descriptors and handles
//ois.close();
//oos.close();
//in.close();
//sock.close();
return;
}
}
// Ask for the ping
try {
logger.info(methodName, sset.getId(), "ExtrnPingDriver: ping OUT");
setCommonProperties(props);
oos.writeObject(new Ping(false, props));
oos.flush();
oos.reset();
} catch (IOException e1) {
logger.error(methodName, sset.getId(), e1);
errors++;
return;
}
// Try to read the response
try {
Pong resp = (Pong) ois.readObject();
logger.trace(methodName, sset.getId(), "ExtrnPingDriver: ping RECEIVED");
handleResponse(resp);
logger.trace(methodName, sset.getId(), "ExtrnPingDriver: ping HANDLED");
} catch (IOException e1) {
logger.warn(methodName, sset.getId(), "ExtrnPingDriver: Error receiving ping:", e1);
errors++;
return;
}
// Wait a bit for the next one
try {
logger.trace(methodName, sset.getId(), "ExtrnPingDriver: SLEEPING", meta_ping_rate, "ms", sset.toString());
Thread.sleep(meta_ping_rate);
logger.trace(methodName, sset.getId(), "ExtrnPingDriver: SLEEP returns", sset.toString());
} catch (InterruptedException e) {
logger.info(methodName, sset.getId(), e);
}
}
} catch (IOException e) {
logger.error(methodName, sset.getId(), "ExtrnPingDriver: Error receiving ping", e);
errors++;
} catch (ClassNotFoundException e) {
logger.error(methodName, sset.getId(), "ExtrnPingDriver: Input garbled:", e);
errors++;
}
}
}
class StdioListener
implements Runnable
{
InputStream in;
String tag;
boolean done = false;
StdioListener(int which, InputStream in)
{
this.in = in;
switch ( which ) {
case 1: tag = "STDOUT: "; break;
case 2: tag = "STDERR: "; break;
}
}
void stop()
{
this.done = true;
}
public void run()
{
if ( done ) return;
long tid = Thread.currentThread().getId();
String methodName = "StdioListener.run[" + tid + "]";
BufferedReader br = new BufferedReader(new InputStreamReader(in));
while ( true ) {
try {
String s = br.readLine();
if ( test_mode ) System.out.println(tag + s);
else logger.info(methodName, sset.getId(), tag, s);
if ( s == null ) {
String msg = tag + "closed, listener returns";
if ( test_mode ) System.out.println(msg);
else logger.info(methodName, sset.getId(), msg);
return;
}
} catch (IOException e) {
// if anything goes wrong this guy is toast.
if ( test_mode) e.printStackTrace();
else logger.error(methodName, sset.getId(), e);
return;
}
}
}
}
// public static void main(String[] args)
// {
// // arg0 = amqurl = put into -Dbroker.url
// // arg1 = endpoint - pass to ServicePingMain
// // call ServicePingMain --class org.apache.uima.ducc.sm.PingTester --endpoint FixedSleepAE_1
// // make sure test.jar is in the classpath
// PingDriver csm = new PingDriver(args[0]);
// csm.run();
// }
}