blob: acd3cde75db4de7133fddb829eddba5cf4a0a9fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.sm;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.ServiceModifyEvent;
import org.apache.uima.ducc.transport.event.ServiceQueryEvent;
import org.apache.uima.ducc.transport.event.ServiceQueryReplyEvent;
import org.apache.uima.ducc.transport.event.ServiceReplyEvent;
import org.apache.uima.ducc.transport.event.ServiceStartEvent;
import org.apache.uima.ducc.transport.event.ServiceStopEvent;
import org.apache.uima.ducc.transport.event.ServiceUnregisterEvent;
import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.sm.IServiceDescription;
import org.apache.uima.ducc.transport.event.sm.ServiceDependency;
import org.apache.uima.ducc.transport.event.sm.ServiceMap;
public class ServiceHandler
implements SmConstants,
Runnable
{
/**
*
*/
private static final long serialVersionUID = 1L;
private DuccLogger logger = DuccLogger.getLogger(ServiceHandler.class.getName(), COMPONENT_NAME);
private IServiceManager serviceManager;
private ServiceStateHandler serviceStateHandler = new ServiceStateHandler();
private ServiceMap serviceMap = new ServiceMap(); // note this is the sync object for publish
private Map<DuccId, IDuccWork> newJobs = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> newServices = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, IDuccWork>();
private List<ApiHandler> pendingRequests = new LinkedList<ApiHandler>();
private Object stateUpdateLock = new Object();
public ServiceHandler(IServiceManager serviceManager)
{
this.serviceManager = serviceManager;
}
public synchronized void run()
{
String methodName = "run";
while ( true ) {
try {
wait();
} catch (InterruptedException e) {
logger.error(methodName, null, e);
}
try {
runCommands(); // enqueued orders that came in while I was away
processUpdates();
} catch (Throwable t) {
logger.error(methodName, null, t);
}
}
}
/**
* At boot only ... pass in the set of all known active services to each service so it can update
* internal state with current published state.
*/
void synchronizeImplementors(Map<DuccId, JobState> servicemap)
{
ArrayList<String> keys = serviceStateHandler.getServiceNames();
for ( String k : keys ) {
ServiceSet sset = serviceStateHandler.getServiceByName(k);
sset.synchronizeImplementors(servicemap);
}
}
void processUpdates()
{
String methodName = "processUpdates";
logger.info(methodName, null, "Processing updates.");
Map<DuccId, IDuccWork> deletedJobsMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> modifiedJobsMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> newJobsMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> deletedServicesMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> modifiedServicesMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> newServicesMap = new HashMap<DuccId, IDuccWork>();
synchronized(stateUpdateLock) {
deletedJobsMap.putAll(deletedJobs);
deletedJobs.clear();
modifiedJobsMap.putAll(modifiedJobs);
modifiedJobs.clear();
deletedServicesMap.putAll(deletedServices);
deletedServices.clear();
modifiedServicesMap.putAll(modifiedServices);
modifiedServices.clear();
newServicesMap.putAll(newServices);
newServices.clear();
newJobsMap.putAll(newJobs);
newJobs.clear();
}
// We could potentially have several updates where a service or arrives, is modified, and then deleted, while
// we are busy. Need to handle them in the right order.
//
// Jobs are dependent on services but not the other way around - I think we need to handle services first,
// to avoid the case where something is dependent on something that will exist soon but doesn't currently.
handleNewServices (newServicesMap );
handleModifiedServices(modifiedServicesMap);
handleDeletedServices (deletedServicesMap );
handleImplicitServices( );
handleNewJobs (newJobsMap );
handleModifiedJobs (modifiedJobsMap );
handleDeletedJobs (deletedJobsMap );
serviceManager.publish(serviceMap);
List<ServiceSet> regsvcs = serviceStateHandler.getRegisteredServices();
for ( ServiceSet sset : regsvcs ) {
sset.enforceAutostart();
}
}
void signalUpdates( // This is the incoming or map, with work split into categories.
// The incoming maps are volatile - must save contents before returning.
HashMap<DuccId, IDuccWork> newJobs,
HashMap<DuccId, IDuccWork> newServices,
HashMap<DuccId, IDuccWork> deletedJobs,
HashMap<DuccId, IDuccWork> deletedServices,
HashMap<DuccId, IDuccWork> modifiedJobs,
HashMap<DuccId, IDuccWork> modifiedServices
)
{
synchronized(stateUpdateLock) {
this.newJobs.putAll(newJobs);
this.newServices.putAll(newServices);
this.deletedJobs.putAll(deletedJobs);
this.deletedServices.putAll(deletedServices);
this.modifiedJobs.putAll(modifiedJobs);
this.modifiedServices.putAll(modifiedServices);
}
synchronized(this) {
notify();
}
}
void runCommands()
{
String methodName = "runCommands";
LinkedList<ApiHandler> tmp = new LinkedList<ApiHandler>();
synchronized(pendingRequests) {
tmp.addAll(pendingRequests);
pendingRequests.clear();
}
logger.info(methodName, null, "Running", tmp.size(), "API Tasks.");
for ( ApiHandler apih : tmp ) {
apih.run();
}
}
void addApiTask(ApiHandler apih)
{
synchronized(pendingRequests) {
pendingRequests.add(apih);
}
}
/**
* Resolves state for the job in id based on the what it is dependent upon - the independent services
*/
protected void resolveState(DuccId id, ServiceDependency dep)
{
String methodName = "resolveState";
Map<String, ServiceSet> services = serviceStateHandler.getServicesForJob(id);
if ( services == null ) {
dep.setState(ServiceState.NotAvailable); // says that nothing i need is available
return;
}
ServiceState state = ServiceState.Available;
//
// Start with the most permissive state and reduce it as we walk the list
// Running > Initializing > Waiting > NotAvailable
//
// This sets the state to the min(all dependent service states)
//
for ( ServiceSet sset : services.values() ) {
if ( sset.getServiceState().ordinality() < state.ordinality() ) state = sset.getServiceState();
dep.setIndividualState(sset.getKey(), sset.getServiceState());
logger.debug(methodName, id, "Set individual state", sset.getServiceState());
}
dep.setState(state);
}
/**
* This is called when an endpoint is referenced as a dependent service from a job or a service.
* It is called only when a new job or service is first discovred in the OR map.
*/
protected Map<String, ServiceSet> resolveDependencies(DuccWorkJob w, ServiceDependency s)
{
String methodName = "resolveDependencies";
DuccId id = w.getDuccId();
String[] deps = w.getServiceDependencies();
// New services, if any are discovered
boolean fatal = false;
Map<String, ServiceSet> jobServices = new HashMap<String, ServiceSet>();
for ( String dep : deps ) {
// put it into the global map of known services if needed and up the ref count
ServiceSet sset = serviceStateHandler.getServiceByName(dep);
if ( sset == null ) { // first time, so it's by reference only
try {
sset = new ServiceSet(dep, serviceManager.newId());
serviceStateHandler.putServiceByName(dep, sset);
} catch ( Exception e ) { // if 'dep' is invalid, or we can't get a duccid, we throw
s.addMessage(dep, e.getMessage());
s.setState(ServiceState.NotAvailable);
fatal = true;
continue;
}
}
if ( sset.isDeregistered() ) {
// Registerered services only - the service might even still be alive because it can
// take a while to get rid of these guys - we need to be sure we don't attach any
// new jobs to it.
s.addMessage(dep, "Independent registered service [" + dep + "] has been deregistered and is terminating.");
s.setState(ServiceState.NotAvailable);
fatal = true;
continue;
}
//
// We try to vet all services so the message is complete. If we've already had some fatal problems
// we need to bypass any attempt to cope with registered services or updating the sset.
//
if ( ! fatal ) {
if ( sset.isRegistered() && (sset.countImplementors() == 0) && sset.isStartable() ) {
// Registered but not alive, well, we can fix that!
int ninstances = sset.getNInstances();
logger.debug(methodName, sset.getId(), "Reference-starting registered service, instances =", ninstances);
if ( ! sset.isAutostart() ) { // must avoid races ith autostart for referenced start
sset.setReferencedStart(true);
}
for ( int i = 0; i < ninstances; i++ ) {
if ( ! sset.start() ) {
s.addMessage(dep, "Can't start independent service.");
s.setState(ServiceState.NotAvailable);
break;
}
}
}
jobServices.put(dep, sset);
sset.reference(id);
serviceStateHandler.putServiceForJob(w.getDuccId(), sset);
logger.debug(methodName, id, "Service init ok. Ref[", dep, "] incr to", sset.countReferences());
}
}
if ( fatal ) {
jobServices.clear();
}
return jobServices;
}
protected void handleNewJobs(Map<DuccId, IDuccWork> work)
{
String methodName = "handleNewJobs";
// Map of updates to send to OR
HashMap<DuccId, ServiceDependency> updates = new HashMap<DuccId, ServiceDependency>();
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
if ( !w.isActive() ) {
logger.info(methodName, id, "Bypassing inactive job, state =", w.getStateObject());
continue;
}
ServiceDependency s = new ServiceDependency(); // for the OR
updates.put(id, s);
String[] deps = w.getServiceDependencies();
if ( deps == null ) { // no deps, just mark it running and move on
s.setState(ServiceState.Available);
logger.info(methodName, id, "Added to map, no service dependencies.");
continue;
}
//
// Get dependency references and fire up their state machines
//
Map<String, ServiceSet> jobServices = resolveDependencies(w, s);
for ( ServiceSet sset : jobServices.values() ) {
sset.establish();
}
resolveState(id, s);
logger.info(methodName, id, "Added job to map, with service dependency state.", s.getState());
}
serviceMap.putAll(updates);
}
/**
* A job or service has ended. Here's common code to clean up the dependent services.
* @param id - the id of the job or service that stopped
* @param deps - the services that 'id' was dependent upon
*/
protected void stopDependentServices(DuccId id)
{
String methodName = "stopDependentServices";
Map<String, ServiceSet> deps = serviceStateHandler.getServicesForJob(id);
if ( deps == null ) {
logger.debug(methodName, id, "No dependent services to stop, returning.");
return; // service already deleted, timing issue
}
//
// Bop through all the things job 'id' is dependent upon, and update their refcounts. If
// the refs go to 0 we stop the pinger and sometimes the independent service itself.
//
for ( String dep : deps.keySet() ) {
logger.debug(methodName, id, "Looking up service", dep);
ServiceSet sset = deps.get(dep);
if ( sset == null ) {
throw new IllegalStateException("Null service for " + dep); // sanity check, should never happen
}
int count = sset.dereference(id); // also maybe stops the pinger
logger.debug(methodName, id, "Ref count for", sset.getKey(), "goes down to", count);
if ( count == 0 ) {
if ( sset.isImplicit() ) {
logger.debug(methodName, id, "Removing unreferenced implicit service", dep, "refcount", count);
serviceStateHandler.removeService(dep);
}
if ( sset.isRegistered() && sset.isReferencedStart() ) {
if ( sset.isAutostart() ) { // could have happened after the reference
sset.setReferencedStart(false); // so we don't linger, just reset
} else {
logger.debug(methodName, id, "Stopping reference-started service", dep, "refcount", count);
sset.lingeringStop();
}
}
}
}
// last, indicate that job 'id' has nothing its dependent upon any more
serviceStateHandler.removeServicesForJob(id);
}
protected void handleDeletedJobs(Map<DuccId, IDuccWork> work)
{
String methodName = "handleCompletedJobs";
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
String[] deps = w.getServiceDependencies();
if ( deps == null ) { // no deps, just mark it running and move on
logger.info(methodName, id, "No service dependencies, no updates made.");
continue;
}
stopDependentServices(id);
logger.info(methodName, id, "Deleted job from map");
}
serviceMap.removeAll(work.keySet());
}
protected void handleModifiedJobs(Map<DuccId, IDuccWork> work)
{
String methodName = "handleModifiedJobs";
//
// Only look at active jobs. The others will be going away soon and we use
// that time as a grace period to keep the management machinery running in
// case more work comes in in the next few minutes.
//
// Everything is already in the service map so we just update the state.
//
for ( DuccId id : work.keySet() ) {
DuccWorkJob j = (DuccWorkJob) work.get(id);
String[] deps = j.getServiceDependencies();
if ( deps == null ) { // no deps, just mark it running and move on
logger.info(methodName, id, "No service dependencies, no updates made.");
continue;
}
ServiceDependency s = serviceMap.get(id);
if ( j.isFinished() ) {
stopDependentServices(id);
s.setState(ServiceState.NotAvailable);
s.clearMessages();
} else if ( j.isActive() ) {
resolveState(id, s);
}
}
}
protected void handleNewServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleNewServices";
Map<DuccId, ServiceDependency> updates = new HashMap<DuccId, ServiceDependency>(); // to be added to the service map sent to OR
Map<String, ServiceSet> newservices = new HashMap<String, ServiceSet>(); // to be added to our internal maps in serviceState
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
//
// On restart we sometimes get stale stuff that we just ignore.
// What else? Is the the right thing to do?
//
if ( !w.isActive() ) {
logger.info(methodName, id, "Bypassing inactive service, state=", w.getStateObject());
continue;
}
ServiceDependency s = new ServiceDependency();
updates.put(id, s);
String endpoint = w.getServiceEndpoint();
if ( endpoint == null ) { // the job is damaged if this happens
String msg = "No service endpoint. Service cannot be validated.";
logger.warn(methodName, id, msg);
s.addMessage("null", msg); // this is a fatal state always
s.setState(ServiceState.NotAvailable);
continue;
}
String[] deps = w.getServiceDependencies(); // other services this svc depends on
ServiceSet sset = serviceStateHandler.getServiceByName(endpoint);
if ( sset == null ) {
// submitted, we just track but not much else
try {
sset = new ServiceSet(serviceManager.newId(), id, endpoint, deps); // creates a "submitted" service
sset.addImplementor(id, w.getJobState());
serviceStateHandler.putServiceByName(endpoint, sset);
} catch ( Exception e ) {
s.addMessage(endpoint, e.getMessage());
s.setState(ServiceState.NotAvailable);
continue;
}
} else if ( sset.isDeregistered() ) {
s.addMessage(endpoint, "Duplicate endpoint: terminating deregistered service.");
s.setState(ServiceState.NotAvailable);
continue;
} else if ( sset.matches(id) ) {
// TODO: not clear we have to do anything here since establish() below will
// add to the implementors. Be sure to update the check so the
// code in the following 'else' clause is executed correctly though.
// and instance/implementor of our own registered services
sset.addImplementor(id, w.getJobState());
} else {
//
// If the new service is not a registered service, and it is a duplicate of another service
// which isn't registered, we allow it to join the party.
//
// When it joins, it needs to "propmote" the ServiceSet to "Submitted".
//
// a) in the case of "implicit" we don't know enough to many any moral judgements at all
// b) in the case of "submitted" it could be the user is increasing the pool of servers by
// submitting more jobs. Perhaps we would better handle this via modify but for the moment,
// just allow it.
// c) in the case of "registered" we know and manage everything and don't allow it. users must
// use the services modify api to increase or decrease instances.
//
if ( !sset.isRegistered() ) {
sset.addImplementor(id, w.getJobState());
sset.promote(); // we'll do this explicitly as a reminder that it's happening and
// to insure we NEVER promote a registered service (which is actually
// a demotion!).
} else {
String msg = "Duplicate endpoint: Registered service.";
logger.warn(methodName, id, msg);
s.addMessage(endpoint, msg);
s.setState(ServiceState.NotAvailable);
continue;
}
}
// The service is new and unique if we get this far
//
// No deps. Put it in the map and move on.
//
if ( deps == null ) {
logger.info(methodName, id, "Added service to map, no service dependencies. ");
s.setState(ServiceState.Available); // good to go in the OR (the state of things i'm dependent upon)
sset.establish(id, w.getJobState()); // sets my own state based entirely on state of w
continue;
}
Map<String, ServiceSet> jobServices = resolveDependencies(w, s); //
for ( ServiceSet depset : jobServices.values() ) {
depset.establish();
}
resolveState(id, s);
sset.establish(id, w.getJobState());
logger.info(methodName, id, "Added to map, with service dependencies,", s.getState());
}
serviceStateHandler.recordNewServices(newservices);
serviceMap.putAll(updates);
}
//
// We're here because we got OR state for the service that it has stopped running.
// Must clean up.
//
protected void handleDeletedServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleDeletedServices";
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
String endpoint = w.getServiceEndpoint();
logger.info(methodName, id, "Deleted service:", endpoint);
//
// Dereference and maybe stop the services I'm dependent upon
//
if ( w.getServiceDependencies() == null ) {
logger.info(methodName, id, "No service dependencies to update on removal.");
} else {
stopDependentServices(id); // update references, remove implicit services if any
}
if (endpoint == null ) { // probably impossible but lets not chance NPE
logger.warn(methodName, id, "Missing service endpoint, ignoring.");
continue;
}
ServiceSet sset = serviceStateHandler.getServiceByName(endpoint);
// may have been removed already if we saw it go to complet[ed/ing] and it lingered a while anyway, which is usual
if ( sset != null ) {
sset.removeImplementor(id); // also stops the ping thread if it's the last one
}
}
//serviceStateHandler.removeServicesForJobs(work.keySet()); // services we were dependent upon
serviceMap.removeAll(work.keySet()); // and finally the deleted services
// from the published map
}
/**
* The pinger may have died while we weren't looking. Registered services take care
* of themselves from handleModifiedServices, but we know very little about implicit
* services so we walk them all and make their ServiceSet keep them clean.
*/
protected void handleImplicitServices()
{
ArrayList<String> keys = serviceStateHandler.getServiceNames();
for ( String k : keys ) {
ServiceSet sset = serviceStateHandler.getServiceByName(k);
if ( sset.isImplicit() ) {
sset.establish();
}
}
}
protected void handleModifiedServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleModifiedServices";
//
// This is a specific service process, but not necessarily the whole service.
//
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
String endpoint = w.getServiceEndpoint();
if (endpoint == null ) { // probably impossible but lets not chance NPE
logger.info(methodName, id, "Missing service endpoint, ignoring.");
continue;
}
ServiceSet sset = serviceStateHandler.getServiceByName(endpoint);
if ( sset == null ) {
// may have already died and this is just leftover OR publications.
if ( w.isActive() ) { // or maybe we just screwed up!
logger.info(methodName, id, "Got update for active service instance", id.toString(), "but no ServiceSet! Job state:", w.getJobState());
continue;
}
continue;
}
if ( !sset.containsImplementor(id) ) {
logger.info(methodName, id, "Bypassing removed service instance for", endpoint);
continue;
}
ServiceDependency s = serviceMap.get(id);
if ( w.isFinished() ) { // nothing more, just dereference and maybe stop stuff I'm dependent upon
stopDependentServices(id);
s.setState(ServiceState.NotAvailable); // tell orchestrator
} else if ( w.getServiceDependencies() != null ) { // update state from things I'm dependent upon
resolveState(id, s);
}
// See what happened to the instance ...
if ( w.isActive() ) {
// Hard to know for sure, if there are a bunch of instances, some working and some not, how to manage this.
// But this is a state *change* of something, and the something is active, so probably the service is OK now
// if it hadn't been before.
// Need to be cautious here - this will get reset if ANYthing is running. So we could have a bunch
// of live instances and some new ones, where the live ones are ok but for some reason we can't start
// new ones, in which case this gets set too often.
//
// This seems like it would be rare and since we aren't actually pounding restarts (only attempts every
// SM cycle) maybe its ok. The alternative is to track state changes which is added complexity - for
// waht gain, we need to determine with experience.
//
// I suppose the ServiceManagerHandler could easily track the per-process state change - we'd have to
// modify the thing in the map it passes in to show 'before' and 'after' states instead of just passing
// in the DuccWork thing.
//
JobState state = w.getJobState();
if ( state == JobState.Running ) { // only if we confirm it's alive
sset.resetRunFailures();
}
} else {
JobState state = w.getJobState();
if ( state == JobState.Completed ) {
sset.removeImplementor(id);
JobCompletionType jct = w.getCompletionType();
logger.info(methodName, id, "Removing stopped instance from maps: state[", state, "] completion[", jct, "]");
switch ( jct ) {
case EndOfJob:
case CanceledByUser:
case CanceledByAdministrator:
case Undefined:
break;
default:
logger.debug(methodName, id, "RECORDING FAILURE");
// all other cases are errors that contribute to the error count
if ( sset.excessiveRunFailures() ) { // if true, the count is exceeeded, but reset
logger.warn(methodName, null, "Process Failure: " + jct + " Maximum consecutive failures[" + sset.failure_run + "] max [" + sset.failure_max + "]");
} else {
sset.start();
}
break;
}
}
}
// Now factor in cumulative state of the implementors and manage the ping thread as needed
sset.establish(id, w.getJobState());
if ( (sset.getServiceState() == ServiceState.NotAvailable) && (sset.countReferences() == 0) && (sset.countImplementors() == 0) ) {
// this service is now toast. remove from our maps asap to avoid clashes if it gets
// resubmitted before the OR can purge it.
if ( ! sset.isRegistered() ) {
logger.debug(methodName, id, "Removing service", endpoint, "because it died and has no more references.");
serviceStateHandler.removeService(endpoint);
}
serviceStateHandler.removeServicesForJob(id);
}
}
}
/**
* Add in the service dependencies to the query.
*/
void updateServiceQuery(IServiceDescription sd, ServiceSet sset)
{
if ( sset.isRegistered() ) {
//
// The thing may not be running yet / at-all. Pull out the deps from the registration and
// query them individually.
//
String[] deps = sset.getIndependentServices();
if ( deps != null ) {
for ( String dep : deps ) {
ServiceSet independent = serviceStateHandler.getServiceByName(dep);
if ( independent != null ) {
sd.addDependency(dep, independent.getServiceState().decode());
} else {
sd.addDependency(dep, ServiceState.NotAvailable.decode());
}
}
}
} else {
//
// If it's not registered we have to look up all the dependencies of the implementors instead
//
Map<DuccId, JobState> implementors = sset.getImplementors();
for ( DuccId id : implementors.keySet() ) {
Map<String, ServiceSet> deps = serviceStateHandler.getServicesForJob(id); // all the stuff 'id' is dependent upon
if ( deps != null ) {
for ( String s : deps.keySet() ) {
ServiceSet depsvc = deps.get(s);
sd.addDependency(s, depsvc.getServiceState().decode());
}
}
}
}
}
ServiceQueryReplyEvent query(ServiceQueryEvent ev)
{
//String methodName = "query";
long friendly = ev.getFriendly();
String epname = ev.getEndpoint();
ServiceQueryReplyEvent reply = new ServiceQueryReplyEvent();
if (( friendly == -1) && ( epname == null )) {
ArrayList<String> keys = serviceStateHandler.getServiceNames();
for ( String k : keys ) {
ServiceSet sset = serviceStateHandler.getServiceByName(k);
if ( k == null ) continue; // the unlikely event it changed out from under us
IServiceDescription sd = sset.query();
updateServiceQuery(sd, sset);
reply.addService(sd);
}
} else {
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
if ( sset == null ) {
reply.setMessage("Unknown");
reply.setEndpoint(epname);
reply.setId(friendly);
reply.setReturnCode(false);
} else {
IServiceDescription sd = sset.query();
updateServiceQuery(sd, sset);
reply.addService(sd);
}
}
return reply;
}
ServiceReplyEvent start(ServiceStartEvent ev)
{
//String methodName = "start";
long friendly = ev.getFriendly();
String epname = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
if ( sset == null ) {
return new ServiceReplyEvent(false, "Unknown", epname, friendly);
}
String userin = ev.getUser();
String userout = sset.getUser();
if ( !userin.equals(userout) && !serviceManager.isAdministrator(userin) ) {
return new ServiceReplyEvent(false, "Owned by " + userout, epname, friendly);
}
if ( sset.isRegistered() ) {
int running = sset.countImplementors();
int instances = ev.getInstances();
int registered = sset.getNInstances();
int wanted = 0;
if ( instances == -1 ) {
wanted = Math.max(0, registered - running);
} else {
wanted = instances;
}
if ( wanted == 0 ) {
return new ServiceReplyEvent(true,
"Already has instances[" + running + "] - no additional instances started",
sset.getKey(),
sset.getId().getFriendly());
}
pendingRequests.add(new ApiHandler(ev, this));
// // only start something if we don't have enought already going
// ApiHandler apih = new ApiHandler(ev, this);
// Thread t = new Thread(apih);
// t.start();
return new ServiceReplyEvent(true,
"New instances[" + wanted + "]",
sset.getKey(),
sset.getId().getFriendly());
} else {
return new ServiceReplyEvent(false, "Not registered", sset.getKey(), sset.getId().getFriendly());
}
}
//
// Everything to do this must be vetted before it is called
//
// Start with no instance says: start enough new processes to get up the registered amount
// Start with some instances says: start exactly this many
// If the --save option is included, also update the registration
//
void doStart(long friendly, String epname, int instances, boolean update)
{
//String methodName = "doStart";
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
int running = sset.countImplementors();
int registered = sset.getNInstances();
int wanted = 0;
if ( instances == -1 ) {
wanted = Math.max(0, registered - running);
} else {
wanted = instances;
}
if ( update ) {
sset.setNInstances(running + instances);
}
sset.resetRunFailures(); // manual start overrides, if there's still a problem
// the service will be stopped soon anyway.
for ( int i = 0; i < wanted; i++ ) {
if ( !sset.start() ) break;
}
}
ServiceReplyEvent stop(ServiceStopEvent ev)
{
long friendly = ev.getFriendly();
String epname = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
if ( sset == null ) {
return new ServiceReplyEvent(false, "Unknown", epname, friendly);
}
String userin = ev.getUser();
String userout = sset.getUser();
if ( !userin.equals(userout) && !serviceManager.isAdministrator(userin) ) {
return new ServiceReplyEvent(false, "Owned by " + userout, epname, friendly);
}
if ( sset.isRegistered() ) {
if ( sset.isStopped() ) {
return new ServiceReplyEvent(false, "Already stopped", sset.getKey(), sset.getId().getFriendly());
}
pendingRequests.add(new ApiHandler(ev, this));
return new ServiceReplyEvent(true, "Stopping", sset.getKey(), sset.getId().getFriendly());
} else {
return new ServiceReplyEvent(false, "Not registered", sset.getKey(), sset.getId().getFriendly());
}
}
//
// Everything to do this must be vetted before it is called
//
// If instances == 0 set stop the whole service
// Otherwise we just stop the number asked for
// If --save is insicated we update the registry
//
void doStop(long friendly, String epname, int instances, boolean update)
{
//String methodName = "doStop";
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
int running = sset.countImplementors();
int tolose;
if ( instances == -1 ) {
tolose = running;
} else {
tolose = Math.min(instances, running);
}
if ( update ) {
sset.setNInstances(Math.max(0, running - instances)); // never persist < 0 registered instance
}
if ( tolose == running ) {
sset.stop(); // blind stop
} else {
sset.stop(tolose); // selective stop
}
}
ServiceReplyEvent register(DuccId id, String props_filename, String meta_filename, DuccProperties props, DuccProperties meta)
{
String methodName = "register";
String error = null;
boolean must_deregister = false;
ServiceSet sset = null;
try {
sset = new ServiceSet(id, props_filename, meta_filename, props, meta);
} catch (Throwable t) {
error = t.getMessage();
return new ServiceReplyEvent(false, t.getMessage(), "?", id.getFriendly());
}
String key = sset.getKey();
// Check if already registered
ServiceSet sset0 = serviceStateHandler.getServiceByName(key);
if ( sset0 != null ) {
error = ("Duplicate owned by: " + sset0.getUser());
} else {
try {
sset.saveServiceProperties();
} catch ( Exception e ) {
error = ("Internal error; unable to store service descriptor. " + key);
logger.error(methodName, id, e);
must_deregister = true;
}
try {
if ( ! must_deregister ) {
sset.saveMetaProperties();
}
} catch ( Exception e ) {
error = ("Internal error; unable to store service meta-descriptor. " + key);
logger.error(methodName, id, e);
must_deregister = true;
}
// must check for cycles or we can deadlock
if ( ! must_deregister ) {
CycleChecker cc = new CycleChecker(sset);
if ( cc.hasCycle() ) {
error = ("Service dependencies contain a cycle with " + cc.getCycles());
logger.error(methodName, id, error);
must_deregister = true;
}
}
}
if ( error == null ) {
serviceStateHandler.putServiceByName(sset.getKey(), sset);
return new ServiceReplyEvent(true, "Registered", key, id.getFriendly());
} else {
File mf = new File(meta_filename);
mf.delete();
File pf = new File(props_filename);
pf.delete();
return new ServiceReplyEvent(false, error, key, id.getFriendly());
}
}
public ServiceReplyEvent modify(ServiceModifyEvent ev)
{
long friendly = ev.getFriendly();
String epname = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
if ( sset == null ) {
return new ServiceReplyEvent(false, "Unknown", epname, friendly);
}
String userin = ev.getUser();
String userout = sset.getUser();
if ( !userin.equals(userout) && !serviceManager.isAdministrator(userin) ) {
return new ServiceReplyEvent(false, "Owned by " + userout, epname, friendly);
}
if ( sset.isRegistered() ) {
pendingRequests.add(new ApiHandler(ev, this));
// ApiHandler apih = new ApiHandler(ev, this);
// Thread t = new Thread(apih);
// t.start();
return new ServiceReplyEvent(true, "Modifing", sset.getKey(), sset.getId().getFriendly());
} else {
return new ServiceReplyEvent(false, "Not registered", sset.getKey(), sset.getId().getFriendly());
}
}
void doModify(long friendly, String epname, int instances, Trinary autostart, boolean activate)
{
//String methodName = "doStop";
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
if ( instances > 0 ) {
sset.setNInstances(instances); // also persists instances
if ( activate ) {
int running = sset.countImplementors();
int diff = instances - running;
if ( diff > 0 ) {
while ( diff-- > 0 ) {
if ( !sset.start() ) break;
}
} else if ( diff < 0 ) {
sset.stop(-diff);
}
}
}
if ( autostart != Trinary.Unset ) {
sset.setAutostart(autostart.decode());
if ( activate ) {
sset.enforceAutostart();
}
}
}
public ServiceReplyEvent unregister(ServiceUnregisterEvent ev)
{
long friendly = ev.getFriendly();
String epname = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
if ( sset == null ) {
return new ServiceReplyEvent(false, "Unknown", epname, friendly);
}
String userin = ev.getUser();
String userout = sset.getUser();
if ( !userin.equals(userout) && !serviceManager.isAdministrator(userin) ) {
return new ServiceReplyEvent(false, "Owned by " + userout, epname, friendly);
}
if ( sset.isRegistered() ) {
sset.deregister(); // just sets a flag so we know how to handle it when it starts to die
pendingRequests.add(new ApiHandler(ev, this));
// ApiHandler apih = new ApiHandler(ev, this);
// Thread t = new Thread(apih);
// t.start();
return new ServiceReplyEvent(true, "Shutting down implementors", sset.getKey(), sset.getId().getFriendly());
} else {
return new ServiceReplyEvent(false, "Not registered", sset.getKey(), sset.getId().getFriendly());
}
}
//
// Everything to do this must be vetted before it is called. Run in a new thread to not hold up the API.
//
void doUnregister(long friendly, String epname)
{
String methodName = "doUnregister";
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
if ( sset.countImplementors() > 0 ) {
logger.debug(methodName, sset.getId(), "Stopping implementors:", friendly, epname);
sset.stop();
} else if ( sset.isPingOnly() ) {
logger.debug(methodName, sset.getId(), "Unregister ping-only setvice:", friendly, epname);
sset.stop();
serviceStateHandler.removeService(epname, friendly);
} else {
logger.debug(methodName, sset.getId(), "Removing from map:", friendly, epname);
serviceStateHandler.removeService(epname, friendly);
}
sset.deleteProperties();
}
/**
* From: http://en.wikipedia.org/wiki/Topological_sorting
*
* L �? Empty list that will contain the sorted elements
* S �? Set of all nodes with no incoming edges
* while S is non-empty do
* remove a node n from S
* insert n into L
* for each node m with an edge e from n to m do
* remove edge e from the graph
* if m has no other incoming edges then
* insert m into S
* if graph has edges then
* return error (graph has at least one cycle)
* else
* return L (a topologically sorted order)
*/
class CycleChecker
{
ServiceSet sset;
int edges = 0;
List<String> cycles = null;
CycleChecker(ServiceSet sset)
{
this.sset = sset;
}
boolean hasCycle()
{
// Start by building the dependency graph
// TODO: Maybe consider saving this. Not clear there's much of a
// gain doing the extra bookeeping beause the graphs will always
// be small and will only need checking on registration or arrival
// of a submitted service. So this cycle checking is always
// fast anyway.
//
// Bookeeping could be a bit ugly because a submitted service could
// bop in and change some dependency graph. We really only care
// for checking cycles, so we'll check the cycles as things change
// and then forget about it.
//
String[] deps = sset.getIndependentServices();
if ( deps == null ) return false; // man, that was fast!
Map<String, ServiceSet> visited = new HashMap<String, ServiceSet>(); // all the nodes in the graph
clearEdges(sset, visited);
List<ServiceSet> nodes = new ArrayList<ServiceSet>();
nodes.addAll(visited.values());
buildGraph(nodes);
List<ServiceSet> sorted = new ArrayList<ServiceSet>(); // topo-sorted list of nodes
List<ServiceSet> current = new ArrayList<ServiceSet>(); // nodes with no incoming edges
// Constant: current has all nodes with no incoming edges
for ( ServiceSet node : nodes ) {
if ( ! node.hasPredecessor() ) current.add(node);
}
while ( current.size() > 0 ) {
ServiceSet next = current.remove(0); // remove a node n from S
sorted.add(next); // insert n int L
List<ServiceSet> successors = next.getSuccessors();
for ( ServiceSet succ : successors ) { // for each node m(pred) with an edge e from n to m do
next.removeSuccessor(succ); // remove edge from graph
succ.removePredecessor(next); // ...
edges--;
if ( !succ.hasPredecessor() ) current.add(succ); // if m(pred) has no incoming edges insert m into S
}
}
if ( edges == 0 ) return false; // if graph has no edges, no cycles
cycles = new ArrayList<String>(); // oops, and here they are
for ( ServiceSet node : nodes ) {
if ( node.hasSuccessor() ) {
for ( ServiceSet succ : node.getSuccessors() ) {
cycles.add(node.getKey() + " -> " + succ.getKey());
}
}
}
return true;
}
String getCycles()
{
return cycles.toString();
}
//
// Traveerse the graph and make sure all the nodes are "clean"
//
void clearEdges(ServiceSet node, Map<String, ServiceSet> visited)
{
String key = node.getKey();
node.clearEdges();
if ( visited.containsKey(key) ) return;
visited.put(node.getKey(), node);
String[] deps = node.getIndependentServices();
if ( deps == null ) return;
for ( String dep : deps ) {
ServiceSet sset = serviceStateHandler.getServiceByName(dep);
if ( sset != null ) {
clearEdges(sset, visited);
}
}
}
void buildGraph(List<ServiceSet> nodes)
{
for ( ServiceSet node : nodes ) {
String[] deps = node.getIndependentServices(); // never null if we get this far
if ( deps != null ) {
for ( String d : deps ) {
ServiceSet outgoing = serviceStateHandler.getServiceByName(d);
if ( outgoing == null ) continue;
outgoing.setIncoming(node);
node.setOutgoing(outgoing);
edges++;
}
}
}
}
}
class ServiceStateHandler
{
// Map of active service descriptors by endpoint. For UIMA services, key is the endpoint.
private Map<String, ServiceSet> servicesByName = new HashMap<String, ServiceSet>();
private Map<Long, ServiceSet> servicesByFriendly = new HashMap<Long, ServiceSet>();
// For each job, the collection of services it is dependent upon
// DUccId is a Job Id (or id for serice that has dependencies)
private Map<DuccId, Map<String, ServiceSet>> servicesByJob = new HashMap<DuccId, Map<String, ServiceSet>>();
ServiceStateHandler()
{
}
/**
* Return a copy of the keys so we can fetch the services in an orderly manner.
*/
synchronized ArrayList<String> getServiceNames()
{
ArrayList<String> answer = new ArrayList<String>();
for ( String k : servicesByName.keySet() ) {
answer.add(k);
}
return answer;
}
synchronized ServiceSet getServiceByName(String n)
{
return servicesByName.get(n);
}
synchronized ServiceSet getServiceByFriendly(long id)
{
return servicesByFriendly.get( id );
}
// API passes in a friendly (maybe) and an endpiont (maybe) but only one of these
// Here we look up the service by whatever was passed in.
synchronized ServiceSet getServiceForApi(long id, String n)
{
if ( n == null ) return getServiceByFriendly(id);
return getServiceByName(n);
}
synchronized List<ServiceSet> getRegisteredServices()
{
ArrayList<ServiceSet> answer = new ArrayList<ServiceSet>();
for ( ServiceSet sset : servicesByName.values() ) {
if ( sset.isRegistered() ) {
answer.add(sset);
}
}
return answer;
}
synchronized void putServiceByName(String n, ServiceSet s)
{
servicesByName.put(n, s);
DuccId id = s.getId();
if ( id != null ) {
servicesByFriendly.put(id.getFriendly(), s);
}
}
synchronized ServiceSet removeService(String n)
{
ServiceSet s = servicesByName.remove(n);
if ( s != null ) {
DuccId id = s.getId();
if ( id != null ) {
servicesByFriendly.remove(id.getFriendly());
}
}
return s;
}
synchronized void removeService(long id)
{
ServiceSet sset = servicesByFriendly.remove(id);
if ( sset != null ) {
String key = sset.getKey();
servicesByName.remove(key);
}
}
synchronized void removeService(String n, long id)
{
if ( n == null ) removeService(id);
else removeService(n);
}
synchronized Map<String, ServiceSet> getServicesForJob(DuccId id)
{
return servicesByJob.get(id);
}
synchronized void putServiceForJob(DuccId id, ServiceSet s)
{
Map<String, ServiceSet> services = servicesByJob.get(id);
if ( services == null ) {
services = new HashMap<String, ServiceSet>();
servicesByJob.put(id, services);
}
services.put(s.getKey(), s);
}
synchronized void removeServicesForJob(DuccId id)
{
servicesByJob.remove(id);
}
synchronized void recordNewServices(Map<String, ServiceSet> services)
{
servicesByName.putAll(services);
}
}
/**
* Tester for topo sorter.
* Input is props file, e.g. for the graph:
* A -> B, A -> C, B -> C:
*
* services = A B C
* svc.A = B C
* svc.B = C
* svc.C =
*
*/
private void runSortTester(String propsfile)
{
int friendly = 1;
DuccProperties props = new DuccProperties();
try {
props.load(propsfile);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.exit(1);
}
String svcnames = props.getStringProperty("services");
String[] svcs = svcnames.split("\\s");
ServiceSet[] allServices = new ServiceSet[svcs.length];
int ndx = 0;
for ( String svc : svcs ) {
svc = svc.trim();
String key = "UIMA-AS:" + svc + ":tcp://foo:123";
ServiceSet dep = serviceStateHandler.getServiceByName(key);
if ( dep == null ) {
dep = new ServiceSet(new DuccId(friendly++), new DuccId(0), key, null);
serviceStateHandler.putServiceByName(key, dep);
allServices[ndx++] = dep;
}
String depnames = props.getStringProperty("svc." + svc);
String[] deps = depnames.split("\\s");
List<String> subdeps = new ArrayList<String>();
for ( String subsvc : deps ) {
subsvc = subsvc.trim();
if ( subsvc.equals("")) continue;
String subkey = "UIMA-AS:" + subsvc + ":tcp://foo:123";
ServiceSet subdep = serviceStateHandler.getServiceByName(subkey);
if ( subdep == null ) {
subdep = new ServiceSet(new DuccId(friendly++), new DuccId(0), subkey, null);
serviceStateHandler.putServiceByName(subkey, subdep);
allServices[ndx++] = subdep;
}
subdeps.add(subkey);
}
if ( subdeps.size() > 0 ) {
dep.setIndependentServices(subdeps.toArray(new String[subdeps.size()]));
}
}
CycleChecker cc = new CycleChecker(allServices[0]);
if ( cc.hasCycle() ) {
System.out.println("Service dependencies contain a cycle with " + cc.getCycles());
} else {
System.out.println("No cycles detected");
}
}
// tester for the topo sorter
public static void main(String[] args)
{
ServiceHandler sh = new ServiceHandler(null);
sh.runSortTester(args[0]);
}
}