blob: f16c294dc6d7443a76422d7e2d7c4093f127a74d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.sm;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.uima.UIMAFramework;
import org.apache.uima.ducc.cli.IServiceApi.RegistrationOption;
import org.apache.uima.ducc.cli.IUiOptions.UiOption;
import org.apache.uima.ducc.cli.UimaAsPing;
import org.apache.uima.ducc.cli.UimaAsServiceMonitor;
import org.apache.uima.ducc.common.IServiceStatistics;
import org.apache.uima.ducc.common.TcpStreamHandler;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.common.utils.id.ADuccId;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.transport.event.sm.IServiceDescription;
import org.apache.uima.ducc.transport.event.sm.ServiceDescription;
import org.apache.uima.util.Level;
/**
* Represents the collection of process, jobs, and such that implement a given service.
*/
public class ServiceSet
implements SmConstants
{
/**
*
*/
private static final long serialVersionUID = 1L;
private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), COMPONENT_NAME);
// key is unique id of descriptor. The descriptor inherites key from a Job's DuccId, or from
// a unique-to SM key for implicit references.
HashMap<DuccId, JobState> implementors = new HashMap<DuccId, JobState>();
// key is job/service id, value is same. it's a map for fast existence check
HashMap<DuccId, DuccId> references = new HashMap<DuccId, DuccId>();
// For a registered service, here is my registered id
DuccId id;
HashMap<Long, DuccId> friendly_ids = new HashMap<Long, DuccId>();
String history_key = "work-instances";
// incoming nodes, for dup checking
List<ServiceSet> predecessors = new ArrayList<ServiceSet>();
List<ServiceSet> successors = new ArrayList<ServiceSet>();
// for UIMA-AS this is the endpoint as the unique identifier of this service
String key;
// UIMA-AS endpoint information
String endpoint;
String broker;
String broker_host;
int broker_port;
int broker_jmx_port = 1099;
String[] independentServices = null;
// Registered services, the submitter
String user;
// Registered services, Automatically start at boot, and keep implementors alive
boolean autostart = false;
// Registered services, we've been stopped. Must remember in order to counteract autostart.
boolean stopped = false;
// Registered services, remember if was started by reference only so we can stop when refs die
boolean referenced_start = false;
// Registered services, the number of instances to maintain
int instances = 1;
// Service pinger
IServiceMeta serviceMeta = null;
// After stopping a pinger we need to discard it,and we usually need to stop it well before
// it is ok to delete residual state such as UIMA-AS queues. Instead of discarding it, we
// stash it here to use once the last implementor seems to be dead.
IServiceMeta residualMeta = null;
// registered services state files
DuccProperties job_props = null;
DuccProperties meta_props = null;
String props_filename = null;
String meta_filename = null;
boolean deregistered = false;
ServiceType service_type = ServiceType.Undefined;
ServiceClass service_class = ServiceClass.Undefined;
ServiceState service_state = ServiceState.Undefined;;
// structures to manage service linger after it exits
Timer timer = null;
LingerTask linger = null;
long linger_time = 5000;
int failure_max = ServiceManagerComponent.failure_max;
int failure_run = 0; // max allowed consecutive failures, current failure count
//JobState job_state = JobState.Undefined;
//
// This is the constructor for an implicit service
//
public ServiceSet(String key, DuccId id)
{
this.key = key;
this.id = id;
parseEndpoint(key);
if ( this.service_type == ServiceType.Custom ) {
throw new IllegalStateException("Custom services may not be referenced as Implicit services.");
}
this.service_type = ServiceType.UimaAs;
this.service_class = ServiceClass.Implicit;
String state_dir = System.getProperty("DUCC_HOME") + "/state";
// Need job props and meta props for webserver.
// The pinger is always the default configured UIMA-AS pinger.
//
// job props: working_directory, log_directory
// meta props: endpoint, user
job_props = new DuccProperties();
// job_props.put("working_directory", System.getProperty("user.dir")); // whatever my current dir is
// job_props.put("log_directory", System.getProperty("user.dir") + "/../logs");
//job_props.put("service_ping_jvm_args", "-Xmx50M");
props_filename = state_dir + "/services/" + id.toString() + ".svc";
saveServiceProperties();
meta_props = new DuccProperties();
meta_props.put("user", System.getProperty("user.name"));
meta_props.put("endpoint", key);
meta_props.put("service-class", ""+service_class.decode());
meta_props.put("service-type", ""+service_type.decode());
meta_props.put("stopped", ""+stopped);
meta_props.put("service-state", ""+getServiceState());
meta_props.put("ping-active", "false");
meta_props.put("service-alive", "false");
meta_props.put("service-healthy", "false");
meta_props.put("service-statistics", "N/A");
meta_props.put("ping-only", "true");
meta_filename = state_dir + "/services/" + id.toString() + ".meta";
saveMetaProperties();
}
//
// Constructor for a submitted service
//
public ServiceSet(DuccId id, DuccId first_implementor, String key, String[] independentServices)
{
this.key = key;
this.id = id;
this.friendly_ids.put(first_implementor.getFriendly(), null);
this.independentServices = independentServices;
this.service_class = ServiceClass.Submitted;
parseEndpoint(key);
String state_dir = System.getProperty("DUCC_HOME") + "/state";
// Need job props and meta props for webserver.
// The pinger is always the default configured UIMA-AS pinger.
// Submitted services must always be UIMA-AS services, for now, checked in caller.
//
// job props: working_directory, log_directory
// meta props: endpoint, user
job_props = new DuccProperties();
//job_props.put("service_ping_jvm_args", "-Xmx50M");
props_filename = state_dir + "/services/" + id.toString() + ".svc";
saveServiceProperties();
meta_props = new DuccProperties();
meta_props.put("user", System.getProperty("user.name"));
meta_props.put("endpoint", key);
meta_props.put("service-class", ""+service_class.decode());
meta_props.put("service-type", ""+service_type.decode());
meta_props.put("stopped", ""+stopped);
meta_props.put("service-state", ""+getServiceState());
meta_props.put("ping-active", "false");
meta_props.put("service-alive", "false");
meta_props.put("service-healthy", "false");
meta_props.put("service-statistics", "N/A");
meta_props.put("implementors", ""+id.getFriendly());
meta_props.put("ping-only", "false");
meta_filename = state_dir + "/services/" + id.toString() + ".meta";
saveMetaProperties();
}
//
// Constructor for a registered service
//
public ServiceSet(DuccId id, String props_filename, String meta_filename, DuccProperties props, DuccProperties meta)
{
this.job_props = props;
this.meta_props = meta;
this.id = id;
this.props_filename = props_filename;
this.meta_filename = meta_filename;
this.service_state = ServiceState.NotAvailable;
this.linger_time = props.getLongProperty(RegistrationOption.ServiceLinger.decode(), 5000);
this.key = meta.getProperty("endpoint");
this.failure_max = props.getIntProperty(UiOption.InstanceFailuresLimit.pname(), ServiceManagerComponent.failure_max);
parseEndpoint(key);
this.user = meta.getProperty("user");
this.instances = meta.getIntProperty("instances", 1);
this.autostart = meta.getBooleanProperty("autostart", false);
String idprop = meta.getProperty("implementors", null);
if ( idprop != null ) {
String[] ids = idprop.split("\\s");
for ( String i : ids ) {
friendly_ids.put(Long.parseLong(i), null);
}
}
this.service_class = ServiceClass.Registered;
parseIndependentServices();
if ( ! job_props.containsKey("service_ping_dolog")) {
job_props.put("service_ping_dolog", "false");
}
if ( !job_props.containsKey("service_ping_timeout") ) {
job_props.put("service_ping_timeout", ""+ServiceManagerComponent.meta_ping_timeout);
}
meta_props.put("service-class", ""+service_class.decode());
meta_props.put("service-type", ""+service_type.decode());
meta_props.put("stopped", ""+stopped);
meta_props.put("service-state", ""+getServiceState());
meta_props.put("ping-active", "false");
meta_props.put("service-alive", "false");
meta_props.put("service-healthy", "false");
meta_props.put("service-statistics", "N/A");
if ( isStartable() ) {
meta_props.put("ping-only", "false");
} else {
meta_props.put("ping-only", "true");
}
// caller will save the meta props, **if** the rest of registration is ok.
//UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
//UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
// there are a couple junky messages that slip by the above configurations. turn the whole danged thing off.
UIMAFramework.getLogger().setLevel(Level.OFF);
}
DuccId getId()
{
return id;
}
protected void parseEndpoint(String ep)
{
if ( ep.startsWith(ServiceType.UimaAs.decode()) ) {
int ndx = ep.indexOf(":");
ep = ep.substring(ndx+1);
ndx = ep.indexOf(":");
this.endpoint = ep.substring(0, ndx).trim();
this.broker = ep.substring(ndx+1).trim();
this.service_type = ServiceType.UimaAs;
URL url = null;
try {
url = new URL(null, broker, new TcpStreamHandler());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid broker URL: " + broker);
}
this.broker_host = url.getHost();
this.broker_port = url.getPort();
if ( this.endpoint.equals("") || this.broker.equals("") ) {
throw new IllegalArgumentException("The endpoint cannot be parsed. Expecting UIMA-AS:Endpoint:Broker, received " + key);
}
} else {
this.service_type = ServiceType.Custom;
int ndx = ep.indexOf(":");
this.endpoint = ep.substring(ndx+1);
}
}
synchronized void deleteProperties()
{
// be sure to move any services that seem not to have croaked yet to history
String history = meta_props.getStringProperty(history_key, "");
for ( Long id : friendly_ids.keySet() ) {
history = history + " " + id.toString();
}
meta_props.put(history_key, history);
meta_props.remove("implementors");
ServiceManagerComponent.deleteProperties(id.toString(), meta_filename, meta_props, props_filename, job_props);
meta_filename = null;
props_filename = null;
}
void setIncoming(ServiceSet sset)
{
predecessors.add(sset);
}
void clearEdges()
{
predecessors.clear();
successors.clear();
}
boolean isStopped()
{
return stopped;
}
boolean hasPredecessor()
{
return predecessors.size() != 0;
}
List<ServiceSet> getPredecessors()
{
return predecessors;
}
void removePredecessor(ServiceSet pred)
{
predecessors.remove(pred);
}
void setOutgoing(ServiceSet sset)
{
this.successors.add(sset);
}
List<ServiceSet> getSuccessors()
{
return new ArrayList<ServiceSet>(this.successors);
}
void removeSuccessor(ServiceSet succ)
{
successors.remove(succ);
}
boolean hasSuccessor()
{
return successors.size() != 0;
}
String[] getIndependentServices()
{
return independentServices;
}
// just for testing!
void setIndependentServices(String[] ind)
{
this.independentServices = ind;
}
private void parseIndependentServices()
{
String depstr = job_props.getProperty(RegistrationOption.ServiceDependency.decode());
String[] result = null;
if ( depstr != null ) {
result = depstr.split("\\s");
for ( int i = 0; i < result.length; i++ ) {
result[i] = result[i].trim();
}
}
independentServices = result;
}
/**
* A service is startable if it's UIMA-AS, or if it's CUSTOM and has a process_executable
* associated with it. A non-startable CUSTOM service may have only a pinger.
*/
boolean isStartable()
{
switch ( service_type ) {
case UimaAs:
return true;
case Custom:
return job_props.containsKey("process_executable");
}
return false; // redundant but needed for compilation
}
/**
* At boot only ... synchronize my state with published OR state.
*/
void synchronizeImplementors(Map<DuccId, JobState> work)
{
HashMap<Long, DuccId> newmap = new HashMap<Long, DuccId>();
// first loop synchronized 'friendly_ids' with live jobs comining in
for ( DuccId id : work.keySet() ) {
long fid = id.getFriendly();
if ( friendly_ids.containsKey(fid) ) {
JobState js = work.get(id);
implementors.put(id, js);
newmap.put(fid, id);
}
}
// second loop synchronizes history with jobs that used to be live and aren't any more (because of restart)
String history = meta_props.getStringProperty(history_key, "");
for ( Long friendly : friendly_ids.keySet() ) {
DuccId id = newmap.get(friendly);
if ( id == null ) {
history = history + " " + friendly;
}
}
meta_props.put(history_key, history);
friendly_ids = newmap; // replace persisted version with validated version from OR state
persistImplementors();
}
/**
*
*/
void enforceAutostart()
{
//String methodName = "enforceAutostart";
if ( ! autostart ) return; // not doing auto, nothing to do
if ( stopped ) return; // doing auto, but we've been manually stopped
if ( failure_run >= failure_max ) return; // too many failures, no more enforcement
if ( (!isStartable()) && (serviceMeta == null) ) { // ping-only and pinger not alive
start(); // ... then it needs to be started
return;
}
// could have more implementors than instances if some were started dynamically but the count not persisted
int needed = Math.max(0, instances - countImplementors());
while ( (needed--) > 0 ) {
if ( ! start() ) break;
}
}
/**
* Add implementor that we have an OR-assigned DuccId for
*/
public void addImplementor(DuccId id, JobState js)
{
if ( isSubmitted() ) {
friendly_ids.put(id.getFriendly(), id);
}
implementors.put(id, js);
persistImplementors();
}
void promote()
{
String methodName = "promote";
logger.debug(methodName, null, "Promoting", key);
switch ( service_class ) {
case Implicit : this.service_class = ServiceClass.Submitted; break;
case Submitted: break;
default : throw new IllegalStateException("Trying to promote a Registered service!");
}
}
boolean isUimaAs()
{
return (service_type == ServiceType.UimaAs);
}
boolean isCustom()
{
return (service_type == ServiceType.Custom);
}
Map<DuccId, JobState> getImplementors()
{
return implementors;
}
DuccProperties getJobProperties()
{
return job_props;
}
DuccProperties getMetaProperties()
{
return meta_props;
}
boolean isImplicit()
{
return (service_class == ServiceClass.Implicit);
}
boolean isSubmitted()
{
return (service_class == ServiceClass.Submitted);
}
boolean isPingOnly()
{
return meta_props.containsKey("ping-only");
}
boolean isRegistered()
{
return (service_class == ServiceClass.Registered) && (!deregistered);
}
void setReferencedStart(boolean val)
{
this.referenced_start = val;
}
boolean isReferencedStart()
{
return this.referenced_start;
}
String getUser()
{
return user;
}
/**
* True only if
* a) this is a registereed service, and
* b) it has been deregistered.
* because you can't deregister a non-registered service.
*/
boolean isDeregistered()
{
return (service_class == ServiceClass.Registered) && (deregistered);
}
void deregister()
{
deregistered = true;
}
String getMetaFilename()
{
return meta_filename;
}
String getPropsFilename()
{
return props_filename;
}
synchronized int getNInstances()
{
return instances;
}
synchronized void saveMetaProperties()
{
String methodName = "saveMetaProperties";
if ( meta_filename == null ) {
// if this is null it was deleted and this is some kind of lingering thread updating, that
// we don't really want any more
logger.warn(methodName, id, "Meta properties is deleted, bypassing attempt to save.");
return;
}
meta_props.put("stopped", ""+stopped);
meta_props.put("service-state", ""+ getServiceState());
meta_props.put("ping-active", "" + (serviceMeta != null));
meta_props.put("service-alive", "false");
meta_props.put("service-healthy", "false");
meta_props.put("service-statistics", "N/A");
if ( serviceMeta != null ) {
IServiceStatistics ss = serviceMeta.getServiceStatistics();
if ( ss != null ) {
meta_props.put("service-alive", "" + ss.isAlive());
meta_props.put("service-healthy", "" + ss.isHealthy());
meta_props.put("service-statistics", "" + ss.getInfo());
}
}
FileOutputStream fos = null;
try {
fos = new FileOutputStream(meta_filename);
meta_props.store(fos, "Meta descriptor");
} catch (FileNotFoundException e) {
logger.warn(methodName, id, "Cannot save meta properties, file does not exist.");
} catch (IOException e) {
logger.warn(methodName, id, "I/O Error saving meta properties:", e);
} finally {
try {
if ( fos != null ) fos.close();
} catch (IOException e) {
}
}
return;
}
void saveServiceProperties()
{
String methodName = "saveServiceProperties";
FileOutputStream fos = null;
try {
fos = new FileOutputStream(props_filename);
job_props.store(fos, "Service descriptor");
} catch (FileNotFoundException e) {
logger.warn(methodName, id, "Cannot save service properties, file does not exist.");
} catch (IOException e) {
logger.warn(methodName, id, "I/O Error saving service properties:", e);
} finally {
try {
if ( fos != null ) fos.close();
} catch (IOException e) {
}
}
return;
}
synchronized void setNInstances(int n)
{
if ( n != meta_props.getIntProperty("instances") ) {
meta_props.setProperty("instances", Integer.toString(n));
this.instances = n;
saveMetaProperties();
}
}
synchronized void setAutostart(boolean auto)
{
meta_props.setProperty("autostart", auto ? "true" : "false");
this.autostart = auto;
saveMetaProperties();
if ( auto ) {
// turning this on gives benefit of the doubt on failure management
failure_run = 0;
}
}
synchronized boolean isAutostart()
{
return autostart;
}
synchronized void persistImplementors()
{
if ( isImplicit() ) return;
if ( friendly_ids.size() == 0 ) {
meta_props.remove("implementors");
} else {
StringBuffer sb = new StringBuffer();
for ( Long l : friendly_ids.keySet() ) {
sb.append(Long.toString(l));
sb.append(" ");
}
String s = sb.toString().trim();
meta_props.setProperty("implementors", s);
}
saveMetaProperties();
}
synchronized void persistReferences()
{
if ( references.size() == 0 ) {
meta_props.remove("references");
} else {
StringBuffer sb = new StringBuffer();
for ( DuccId id : references.keySet() ) {
sb.append(id.toString());
sb.append(" ");
}
String s = sb.toString().trim();
meta_props.setProperty("references", s);
}
saveMetaProperties();
}
/**
* If we're are registered service, and one of the "stringids" from submit
* happens to match the friendly id passed in then this ServiceSet is
* the representative object for the service.
*/
boolean matches(DuccId did)
{
if ( ! isRegistered() ) return false;
if ( friendly_ids.containsKey(did.getFriendly()) ) {
friendly_ids.put(did.getFriendly(), did);
return true;
}
return false;
}
boolean containsImplementor(DuccId id)
{
// must use friendly, in case the thing was just started and not into the implementors set yet
return friendly_ids.containsKey(id.getFriendly());
}
public void removeImplementor(DuccId id)
{
String methodName = "removeImplementors";
if ( ! implementors.containsKey(id ) ) return; // quick short circuit if it's already gone
logger.debug(methodName, this.id, "Removing implementor", id);
implementors.remove(id);
friendly_ids.remove(id.getFriendly());
if ( implementors.size() == 0 ) {
stopPingThread();
}
String history = meta_props.getStringProperty(history_key, "");
history = history + " " + id.toString();
meta_props.put(history_key, history);
persistImplementors();
//
// So much to check here
// - all implementors gone?
// - is it a registered, not-ping-only service, or a submitted service (startable)
// - is it a uima-as service, with an internal pinger?
// Only if all that is true, we'll clear out the queues.
//==
logger.debug(methodName, id, "implementors.size()", implementors.size(),
"service_class", service_class,
"isStartable()", isStartable(),
"isSubmitted()", isSubmitted(),
"service_type", service_type,
"ping_class", job_props.getStringProperty("service_ping_class", UimaAsPing.class.getName())
);
// This block is to clear the service queue from ActiveMq if it's no longer being used.
if ( implementors.size() == 0 ) { // Went to 0 and there was a pinger?
if ( ( (service_class == ServiceClass.Registered) && isStartable()) || isSubmitted() ) { // Is one of our happy cases (not ping-only, we don't know much about it.)
if ( service_type == ServiceType.UimaAs ) {
// Either no pinger specified, in which case the default is used. Or, it is specified, and if it
// matches the default, we get to clear anyway.
String pingclass = job_props.getStringProperty("service_ping_class", UimaAsPing.class.getName());
if ( pingclass.equals(UimaAsPing.class.getName()) ) {
UimaAsServiceMonitor monitor = new UimaAsServiceMonitor(endpoint, broker_host, broker_jmx_port);
logger.debug(methodName, id, "Clearing queues");
try {
monitor.init(null);
monitor.clearQueues();
} catch (Throwable e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
}
public synchronized int countImplementors()
{
// The implementos and friendly_ids sets track each other carefully. The former
// tracks process state via the remote process's DuccId. The latter tracks
// what we've tried to start, but may not know the actual state for until we get
// Orchestrator publications.
//
// To avoid ugly races, we consider a process to be an implementor as soon as we
// get the "friendly" id from the Orchestrator.
return friendly_ids.size();
}
public synchronized int reference(DuccId id)
{
String methodName = "reference";
logger.debug(methodName, this.id, " ---------------- Service ", this.id, "references", id);
if ( linger != null ) {
logger.debug(methodName, this.id, " ---------------- Canceling linger task");
linger.cancel();
linger = null;
}
references.put(id, id);
persistReferences();
return references.size();
}
public synchronized int dereference(DuccId id)
{
if ( references.remove(id) == null ) {
throw new IllegalStateException("Id " + id + " not found in map for " + getKey());
}
// stop the pinger if no longer needed
if ( (references.size() == 0) && // nothing left
( isImplicit() || ( isCustom() && !isStartable()) ) // implicit UIMA-AS || implicit CUSTOM
)
{
stopPingThread(); // must stop pinger because there's no state machine
// to do it for us in this situation
}
persistReferences();
return references.size();
}
public synchronized int countReferences()
{
// note that this could change as soon as you get it so don't count on it being correct
// this is intended only for messages that don't have to be too accurate
return references.size();
}
private ServiceState translateJobState(JobState js)
{
switch ( js ) {
case Received: // Job has been vetted, persisted, and assigned unique Id
case WaitingForDriver: // Process Manager is launching Job Driver
case WaitingForServices: // Service Manager is checking/starting services for Job
return ServiceState.Waiting;
case WaitingForResources: // Scheduler is assigning resources to Job
case Initializing: // Process Agents are initializing pipelines
return ServiceState.Initializing;
case Running: // At least one Process Agent has reported process initialization complete
return ServiceState.Available;
case Completing: // Job processing is completing
case Completed: // Job processing is completed
case Undefined: // None of the above
default:
return ServiceState.NotAvailable;
}
}
/**
* The MAX of the states of the implementors.
*/
private ServiceState cumulativeJobState()
{
ServiceState response = ServiceState.NotAvailable;
for ( JobState s : implementors.values() ) {
ServiceState translated = translateJobState(s);
if ( translated.ordinality() > response.ordinality() ) response = translated;
}
// if we're stopped, we cannot advance global state beyond whatever it is, i.e. an "active"
// implementor that hasn't quit yet is not allowed to progress state.
if ( isStopped() ) {
ServiceState current = getServiceState();
if ( current.ordinality() < response.ordinality() ) {
response = current;
}
}
return response;
}
// synchronized void establish()
// {
// if ( implementors.size() == 0 ) {
// startPingThread();
// }
// // Otherwise we let the form that passes in implementor state start the pinger
// }
public synchronized void establish()
{
String methodName = "establish.0";
logger.debug(methodName, id, "service_class", service_class, "nimplementors", implementors.size(),
"instances", instances);
switch ( service_class ) {
case Implicit:
startPingThread();
break;
case Submitted:
for ( DuccId id : implementors.keySet() ) { // there's only one
establish(id, implementors.get(id));
}
break;
case Registered:
// use friendly to find number of potential instances because it reflects both
// the number of actually running instances plus those just started but not yet
// checked with their ducc ids yet.
if ( friendly_ids.size() > 0 ) {
for ( DuccId id : implementors.keySet() ) { // there's only one
establish(id, implementors.get(id));
}
} else {
if ( service_type == ServiceType.Custom ) {
startPingThread();
} else {
int needed = Math.max(0, instances - friendly_ids.size());
while ( (needed--) > 0 ) {
start();
}
}
}
break;
}
}
/**
* Starts a ping thread because I've started up and I need to monitor myself.
*/
public synchronized void establish(DuccId id, JobState job_state)
{
String methodName = "establish.1";
if ( service_class == ServiceClass.Implicit ) {
startPingThread();
return;
}
if ( service_class == ServiceClass.Submitted ) {
//
// Annoying edge case - cancel a service, then resubmit before the first one has time
// to wind down - need to be sure that the winding-down service is not handled by the
// new instance's state machine.
//
if ( ! implementors.containsKey(id) ) {
logger.debug(methodName, id, "Submitted service: Skipping state machine because the service set has no implemetor for", id);
return;
}
}
if ( true ) {
implementors.put(id, job_state);
ServiceState cumulative = cumulativeJobState();
//
// Note on the CUMULATIVE state: this is the cumulative state as determined by service processes. If they
// should all die at once through some temporary glitch the state could go to Unavailable even though the
// SM would now be in active retry - the states below avoid regression state if CUMULATIVE goes to
// Unavailable but the retry count indicates retry is still in progress.
//
//
// The ping state is pretty much always the right state. But if we're
// not yet pinging we need to see if any of the implementors states
// indicates we should be pinging, in which case, start the pinger.
//
logger.debug(methodName, id, "serviceState", getServiceState(), "cumulativeState", cumulative);
switch ( getServiceState() ) {
// If I'm brand new and something is initting then I can be too. If something is
// actually running then I can start a pinger which will set my state.
case Undefined:
switch ( cumulative ) {
case Initializing:
setServiceState(ServiceState.Initializing);
break;
case Available:
startPingThread();
break;
case Waiting:
setServiceState(ServiceState.Waiting);
break;
case NotAvailable:
break;
}
break;
// If I'm initting and now something is running we can start a pinger
case Initializing:
switch ( cumulative ) {
case Available:
startPingThread();
break;
case Initializing:
break;
case Waiting:
setServiceState(ServiceState.Initializing);
break;
case NotAvailable:
if ( failure_run >= failure_max ) {
setServiceState(ServiceState.NotAvailable);
stopPingThread();
} else {
// don't regress if we're in retry
logger.info(methodName, id, "RETRY RETRY RETRY prevents state regression from Initializing");
}
break;
}
break;
case NotAvailable:
switch ( cumulative ) {
case Available:
startPingThread();
setServiceState(ServiceState.Available);
break;
case Initializing:
setServiceState(ServiceState.Initializing);
break;
case Waiting:
setServiceState(ServiceState.Waiting);
break;
case NotAvailable:
break;
}
break;
case Available:
switch ( cumulative ) {
case Available:
startPingThread();
break;
case Initializing:
// Not immediately clear what would cause this other than an error but let's not crash.
logger.warn(methodName, id, "STATE REGRESSION:", getServiceState(), "->", cumulative); // can't do anything about it but complain
setServiceState(ServiceState.Initializing);
break;
case NotAvailable:
if ( failure_run >= failure_max ) {
setServiceState(ServiceState.NotAvailable);
stopPingThread();
} else {
// don't regress if we're in retry
logger.info(methodName, id, "RETRY RETRY RETRY prevents state regression from Available");
}
break;
}
break;
case Waiting:
switch ( cumulative ) {
case Available:
break;
case Initializing:
// state regression can happen with a promoted implicit service, where the service is referenced
// and a pinger starts before the actual service is available. the ping keeps us in 'waiting' state
// up until expiration, but if the service is actually initializing, we regress.
logger.warn(methodName, id, "STATE REGRESSION:", getServiceState(), "->", cumulative); // can't do anything about it but complain
setServiceState(ServiceState.Initializing);
break;
case Waiting:
break;
case NotAvailable:
if ( failure_run >= failure_max ) {
setServiceState(ServiceState.NotAvailable);
} else {
// don't regress if we're in retry
logger.info(methodName, id, "RETRY RETRY RETRY prevents state regression from Waiting");
}
stopPingThread();
break;
}
break;
case Stopping:
switch ( cumulative ) {
case Available:
case Initializing:
case Waiting:
break;
case NotAvailable:
// all the implementors died finally
setServiceState(ServiceState.NotAvailable);
break;
}
break;
}
}
}
synchronized ServiceState getServiceState()
{
return service_state;
}
synchronized void setServiceState(ServiceState ss)
{
this.service_state = ss;
saveMetaProperties();
}
synchronized String getKey()
{
return key;
}
void resetRunFailures()
{
failure_run = 0;
}
synchronized boolean excessiveRunFailures()
{
String methodName = "runFailures";
if ( (++failure_run) >= failure_max ) {
logger.debug(methodName, id, "RUN FAILURES EXCEEDED");
return true;
}
logger.debug(methodName, id, "RUN FAILURES NOT EXCEEDED YET", failure_run);
return false;
}
private void startPingThread()
{
String methodName = "startPingThread";
if ( serviceMeta != null ) return; // don't start multiple times.
try {
logger.info(methodName, id, "Starting ping/monitor.");
serviceMeta = new PingDriver(this);
} catch ( Throwable t ) {
logger.error(methodName, id, "Cannot instantiate ping/monitor.", t);
return;
}
setServiceState(ServiceState.Waiting);
Thread t = new Thread(serviceMeta);
t.start();
}
synchronized void pingExited()
{
String methodName = "pingExited";
if ( serviceMeta != null ) {
logger.warn(methodName, id, "Pinger exited voluntarily, setting state to Undefined. Endpoint", endpoint);
setServiceState(ServiceState.Undefined); // not really sure what state is. it will be
// checked and updated next run through the
// main state machine, and maybe ping restarted.
residualMeta = serviceMeta;
serviceMeta = null;
} else {
if ( ! isStopped() ) { // state may still be Stopping, don't over regress state
setServiceState(ServiceState.NotAvailable);
}
}
if ( isImplicit() ) {
deleteProperties();
} else {
saveMetaProperties();
}
}
public synchronized void stopPingThread()
{
String methodName = "stopPingThread";
if ( serviceMeta != null ) {
logger.debug(methodName, id, "Stopping ping thread, endpoint", endpoint);
serviceMeta.stop();
residualMeta = serviceMeta;
serviceMeta = null;
}
if ( !isRegistered() ) {
deleteProperties();
} else {
saveMetaProperties();
}
}
synchronized void setResponsive()
{
setServiceState(ServiceState.Available);
saveMetaProperties();
}
synchronized void setUnresponsive()
{
setServiceState(ServiceState.NotAvailable);
saveMetaProperties();
}
synchronized void setWaiting()
{
//
// Only switch state sometimes ....
//
switch ( getServiceState() ) {
case Available:
case NotAvailable:
case Undefined:
setServiceState(ServiceState.Waiting);
break;
case Initializing:
case Waiting:
case Stopping:
break;
}
}
// boolean ping()
// {
// //String methodName = "ping";
// boolean answer = true;
//
// // Instantiate Uima AS Client
// BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Map<String, Object> appCtx = new HashMap<String, Object>();
// appCtx.put(UimaAsynchronousEngine.ServerUri, broker);
// appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
// appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, ServiceManagerComponent.meta_ping_timeout); // 500 ms should be enough to get GetMeta reply
//
// try {
// // this sends GetMeta request and blocks waiting for a reply
// uimaAsEngine.initialize(appCtx);
// // logger.info(methodName, null, "Dependent Service Available:", getKey());
// } catch( ResourceInitializationException e) {
// // either broker is down or service not available
// // logger.error(methodName, null, "Remote service unavailable:", getKey());
// answer = false;
// } finally {
// uimaAsEngine.stop();
// }
//
// return answer;
// }
void log_text(String logdir, String text)
{
String methodName = "log_text";
String[] args = {
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
"-u",
user,
"-f",
logdir + "/service.err.log",
"-a",
"--",
text
};
ProcessBuilder pb = new ProcessBuilder(args);
try {
Process p = pb.start();
int rc = p.waitFor();
logger.debug(methodName, null, "Log start errors returns with rc", rc);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
void log_errors(List<String> outlines, List<String> errlines)
{
Date date= new Date();
String ts = (new Timestamp(date.getTime())).toString();
String logdir = job_props.getProperty(UiOption.LogDirectory.pname());
StringBuffer buf = new StringBuffer();
// header
buf.append("==========");
buf.append(" Instance Startup Failure (stdout) ");
buf.append(ts);
buf.append(" ========================================\n");
// stdout
for ( String s : outlines ) {
buf.append(s);
buf.append("\n");
}
log_text(logdir, buf.toString());
buf = new StringBuffer();
buf.append("----------");
buf.append("(stderr) ");
buf.append(ts);
buf.append(" ----------------------------------------\n");
for ( String s : errlines ) {
buf.append(s);
buf.append("\n");
}
buf.append("==========");
buf.append(" End Startup Failure ");
buf.append(ts);
buf.append(" ========================================\n");
log_text(logdir, buf.toString());
}
/**
* This assumes the caller has already verified that I'm a registered service.
*/
boolean start()
{
String methodName = "start";
logger.debug(methodName, null, "START START START START START START START START");
this.stopped = false;
if ( ! isStartable() ) {
establish(); // this will just start the ping thread
return true;
}
// Simple use of ducc_ling, just submit as the user. The specification will have the working directory
// and classpath needed for the service, handled by the Orchestrator and Job Driver.
String[] args = {
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
"-u",
user,
"--",
System.getProperty("ducc.jvm"),
"-cp",
System.getProperty("java.class.path"),
"org.apache.uima.ducc.cli.DuccServiceSubmit",
"--specification",
props_filename
};
for ( int i = 0; i < args.length; i++ ) {
if ( i > 0 && (args[i-1].equals("-cp") ) ) {
// The classpaths can be just awful filling the logs with junk. It will end up in the agent log
// anyway so let's inhibit it here.
logger.debug(methodName, null, "Args[", i, "]: <CLASSPATH>");
} else {
logger.debug(methodName, null, "Args[", i, "]:", args[i]);
}
}
ProcessBuilder pb = new ProcessBuilder(args);
Map<String, String> env = pb.environment();
env.put("DUCC_HOME", System.getProperty("DUCC_HOME"));
ArrayList<String> stdout_lines = new ArrayList<String>();
ArrayList<String> stderr_lines = new ArrayList<String>();
try {
Process p = pb.start();
int rc = p.waitFor();
logger.debug(methodName, null, "DuccServiceSubmit returns with rc", rc);
// TODO: we should attache these streams to readers in threads because too much output
// can cause blocking, deadlock, ugliness.
InputStream stdout = p.getInputStream();
InputStream stderr = p.getErrorStream();
BufferedReader stdout_reader = new BufferedReader(new InputStreamReader(stdout));
BufferedReader stderr_reader = new BufferedReader(new InputStreamReader(stderr));
String line = null;
while ( (line = stdout_reader.readLine()) != null ) {
stdout_lines.add(line);
}
line = null;
while ( (line = stderr_reader.readLine()) != null ) {
stderr_lines.add(line);
}
} catch (Throwable t) {
// TODO Auto-generated catch block
logger.error(methodName, null, t);
}
for ( String s : stderr_lines ) {
logger.info(methodName, id, "Start stderr:", s);
}
// That was annoying. Now search the lines for some hint of the id.
boolean inhibit_cp = false;
boolean started = false;
StringBuffer submit_buffer = new StringBuffer();
boolean recording = false;
for ( String s : stdout_lines ) {
// simple logic to inhibit printing the danged classpath
if ( inhibit_cp ) {
inhibit_cp = false;
logger.info(methodName, id, "<INHIBITED CP>");
} else {
logger.info(methodName, id, "Start stdout:", s);
}
if ( s.indexOf("-cp") >= 0 ) {
inhibit_cp = true;
}
if ( recording ) {
submit_buffer.append(s.trim());
submit_buffer.append(";");
}
if ( s.startsWith("1001 Command launching...") ) {
recording = true;
continue;
}
if ( s.startsWith("Service") && s.endsWith("submitted") ) {
String[] toks = s.split("\\s");
long friendly = 0;
try {
friendly = Long.parseLong(toks[1]);
friendly_ids.put(friendly, null);
persistImplementors();
started = true;
logger.info(methodName, null, "Request to start service " + id.toString() + " accepted as job ", friendly);
} catch ( NumberFormatException e ) {
logger.warn(methodName, null, "Request to start service " + id.toString() + " failed, can't interpret response.: " + s);
}
}
}
boolean rc = true;
if ( ! started ) {
logger.warn(methodName, null, "Request to start service " + id.toString() + " failed.");
meta_props.put("submit_error", submit_buffer.toString());
setAutostart(false);
log_errors(stdout_lines, stderr_lines);
} else {
meta_props.remove("submit_error");
setServiceState(ServiceState.Initializing);
}
saveMetaProperties();
logger.debug(methodName, null, "ENDSTART ENDSTART ENDSTART ENDSTART ENDSTART ENDSTART");
return rc;
}
/**
* This assumes the caller has already verified that I'm a registered service.
*/
void stopOneProcess(DuccId id)
{
String methodName = "stop";
String[] args = {
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
"-u",
user,
"--",
System.getProperty("ducc.jvm"),
"-cp",
System.getProperty("java.class.path"),
"org.apache.uima.ducc.cli.DuccServiceCancel",
"--id",
id.toString()
};
for ( int i = 0; i < args.length; i++ ) {
if ( i > 0 && (args[i-1].equals("-cp") ) ) {
// The classpaths can be just awful filling the logs with junk. It will end up in the agent log
// anyway so let's inhibit it here.
logger.debug(methodName, null, "Args[", i, "]: <CLASSPATH>");
} else {
logger.debug(methodName, null, "Args[", i, "]:", args[i]);
}
}
ProcessBuilder pb = new ProcessBuilder(args);
Map<String, String> env = pb.environment();
env.put("DUCC_HOME", System.getProperty("DUCC_HOME"));
ArrayList<String> stdout_lines = new ArrayList<String>();
ArrayList<String> stderr_lines = new ArrayList<String>();
try {
Process p = pb.start();
int rc = p.waitFor();
logger.debug(methodName, null, "DuccServiceCancel returns with rc", rc);
InputStream stdout = p.getInputStream();
InputStream stderr = p.getErrorStream();
BufferedReader stdout_reader = new BufferedReader(new InputStreamReader(stdout));
BufferedReader stderr_reader = new BufferedReader(new InputStreamReader(stderr));
String line = null;
while ( (line = stdout_reader.readLine()) != null ) {
stdout_lines.add(line);
}
line = null;
while ( (line = stderr_reader.readLine()) != null ) {
stderr_lines.add(line);
}
} catch (Throwable t) {
// TODO Auto-generated catch block
logger.error(methodName, null, t);
}
boolean inhibit_cp = false;
for ( String s : stdout_lines ) {
// simple logic to inhibit printing the danged classpath
if ( inhibit_cp ) {
inhibit_cp = false;
logger.info(methodName, id, "<INHIBITED CP>");
} else {
logger.info(methodName, id, "Stop stdout:", s);
}
if ( s.indexOf("-cp") >= 0 ) {
inhibit_cp = true;
}
}
for ( String s : stderr_lines ) {
logger.info(methodName, id, "Stop stderr:", s);
}
// is this the last implementor? if so the service is no longer available.
// should not have to do this, the state should update correctly in the state machine
//if ( implementors.size() <= 1 ) {
// service_state = ServiceState.NotAvailable;
//}
//return new ServiceReplyEvent(ServiceCode.OK, "Start service " + id.toString() + " complete.", toks[1], null);
}
/**
* Stop everything
*/
void stop()
{
String methodName = "stop";
logger.debug(methodName, id, "Stopping all implementors");
this.stopped = true;
stopPingThread();
setServiceState(ServiceState.Stopping);
if ( ! isStartable() ) { // CUSTOM ping-only service
return;
}
for ( DuccId id : implementors.keySet() ) {
stopOneProcess(id);
}
saveMetaProperties();
}
/**
* Stop 'count' services.
* TODO: Put in logic to stop intelligently, i.e. favor processes not yet running
*/
void stop(int count)
{
String methodName = "stop(count)";
if ( ! isStartable() ) { // CUSTOM ping-only, only one running, let common code do the honors
stop();
return;
}
logger.debug(methodName, id, "Stopping", count, "implementors");
for ( DuccId id: implementors.keySet() ) {
if ( (count--) > 0 ) {
stopOneProcess(id);
} else {
break;
}
}
saveMetaProperties();
}
private class LingerTask
extends TimerTask
{
ServiceSet sset;
LingerTask(ServiceSet sset)
{
String methodName = "LingerTask.init";
logger.debug(methodName, id, "Linger starts", linger_time);
this.sset = sset;
}
public void run()
{
String methodName = "LingerTask.run";
logger.debug(methodName, id, "Lingering stop completes.");
// doesn't matter how its started i think, we have to set this flag off when we stop
sset.setReferencedStart(false);
linger = null;
sset.stop();
}
}
void lingeringStop()
{
if ( timer == null ) {
timer = new Timer();
}
linger = new LingerTask(this);
timer.schedule(linger, linger_time);
}
IServiceDescription query()
{
IServiceDescription sd = new ServiceDescription();
ArrayList<ADuccId> imp = new ArrayList<ADuccId>();
for ( DuccId id : implementors.keySet() ) {
imp.add(id);
}
sd.setImplementors(imp);
ArrayList<ADuccId> ref = new ArrayList<ADuccId>();
ref.clear();
for ( DuccId id : references.keySet() ) {
ref.add(id);
}
sd.setReferences(ref);
sd.setInstances(getNInstances());
sd.setType(service_type);
sd.setSubclass(service_class);
sd.setEndpoint(endpoint);
sd.setBroker(broker);
sd.setServiceState(getServiceState());
//sd.setJobState(job_state);
sd.setActive(serviceMeta != null);
sd.setStopped(stopped);
sd.setAutostart(autostart);
sd.setLinger(linger_time);
sd.setId(id);
sd.setUser(user);
sd.setDeregistered(isDeregistered());
if ( serviceMeta != null ) {
sd.setQueueStatistics(serviceMeta.getServiceStatistics());
}
return sd;
}
/**
* For debugging, so it's easier to identify this guy in the eclipse debugger.
*/
public String toString()
{
return endpoint;
}
}
/**
For reference
public enum JobState {
Received, // Job has been vetted, persisted, and assigned unique Id
WaitingForDriver, // Process Manager is launching Job Driver
WaitingForServices, // Service Manager is checking/starting services for Job
WaitingForResources, // Scheduler is assigning resources to Job
Initializing, // Process Agents are initializing pipelines
Running, // At least one Process Agent has reported process initialization complete
Completing, // Job processing is completing
Completed, // Job processing is completed
Undefined // None of the above
};
*/