blob: 9cddb8b4bd2aed19a8957de8681b567ba1657c21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.sm;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.uima.ducc.cli.DuccServiceApi;
import org.apache.uima.ducc.cli.IUiOptions.UiOption;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.persistence.services.IStateServices;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.AServiceRequest;
import org.apache.uima.ducc.transport.event.ServiceDisableEvent;
import org.apache.uima.ducc.transport.event.ServiceEnableEvent;
import org.apache.uima.ducc.transport.event.ServiceIgnoreEvent;
import org.apache.uima.ducc.transport.event.ServiceModifyEvent;
import org.apache.uima.ducc.transport.event.ServiceObserveEvent;
import org.apache.uima.ducc.transport.event.ServiceQueryEvent;
import org.apache.uima.ducc.transport.event.ServiceQueryReplyEvent;
import org.apache.uima.ducc.transport.event.ServiceReplyEvent;
import org.apache.uima.ducc.transport.event.ServiceStartEvent;
import org.apache.uima.ducc.transport.event.ServiceStopEvent;
import org.apache.uima.ducc.transport.event.ServiceUnregisterEvent;
import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.sm.IService.ServiceState;
import org.apache.uima.ducc.transport.event.sm.IServiceDescription;
import org.apache.uima.ducc.transport.event.sm.ServiceDependency;
import org.apache.uima.ducc.transport.event.sm.ServiceMap;
public class ServiceHandler
implements SmConstants,
Runnable
{
/**
*
*/
private DuccLogger logger = DuccLogger.getLogger(ServiceHandler.class.getName(), COMPONENT_NAME);
private IServiceManager serviceManager;
private ServiceStateHandler serviceStateHandler = new ServiceStateHandler();
private ServiceMap serviceMap = new ServiceMap(); // note this is the sync object for publish
private IStateServices stateHandler;
private Map<DuccId, IDuccWork> newJobs = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> newServices = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, IDuccWork>();
private Map<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, IDuccWork>();
private List<ApiHandler> pendingRequests = new LinkedList<ApiHandler>();
private Object stateUpdateLock = new Object();
private Map<String, UiOption> optionMap; // for modify()
public ServiceHandler(IServiceManager serviceManager)
{
this.serviceManager = serviceManager;
Runtime.getRuntime().addShutdownHook(new ServiceShutdown());
DuccServiceApi dsi = new DuccServiceApi(null); // instantiate this to access the modify options
UiOption[] options = dsi.getModifyOptions();
optionMap = new HashMap<String, UiOption>();
for ( UiOption o : options ) {
optionMap.put(o.pname(), o);
}
}
void setStateHandler(IStateServices handler)
{
this.stateHandler = handler;
}
public synchronized void run()
{
String methodName = "run";
while ( true ) {
try {
wait();
} catch (InterruptedException e) {
logger.error(methodName, null, e);
}
try {
runCommands(); // enqueued orders that came in while I was away
processUpdates();
} catch (Throwable t) {
logger.error(methodName, null, t);
}
}
}
/**
* At boot only ... pass in the set of all known active services to each service so it can update
* internal state with current published state.
*/
void bootImplementors(Map<DuccId, DuccWorkJob> incoming)
{
String methodName = "bootImplementors";
for ( DuccId id : incoming.keySet() ) {
DuccWorkJob j = incoming.get(id);
String ep = j.getServiceEndpoint();
ServiceSet sset = serviceStateHandler.getServiceByUrl(ep);
if ( sset == null ) {
// must cancel this service, no idea what it is
} else {
sset.bootImplementor(id, j.getJobState()); // boot by id, job, not known so more stuff
// has to be built up
}
}
List<ServiceSet> services = serviceStateHandler.getServices();
for ( ServiceSet sset : services ) {
try {
sset.bootComplete();
} catch ( Exception e ) {
logger.warn(methodName, sset.getId(), "Error updating meta properties:", e);
}
if ( sset.countImplementors() > 0 ) { // if something was running, let's make sure all the starts are done
sset.start();
}
}
}
void processUpdates()
{
String methodName = "processUpdates";
logger.info(methodName, null, "Processing updates.");
Map<DuccId, IDuccWork> deletedJobsMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> modifiedJobsMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> newJobsMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> deletedServicesMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> modifiedServicesMap = new HashMap<DuccId, IDuccWork>();
Map<DuccId, IDuccWork> newServicesMap = new HashMap<DuccId, IDuccWork>();
synchronized(stateUpdateLock) {
deletedJobsMap.putAll(deletedJobs);
deletedJobs.clear();
modifiedJobsMap.putAll(modifiedJobs);
modifiedJobs.clear();
deletedServicesMap.putAll(deletedServices);
deletedServices.clear();
modifiedServicesMap.putAll(modifiedServices);
modifiedServices.clear();
newServicesMap.putAll(newServices);
newServices.clear();
newJobsMap.putAll(newJobs);
newJobs.clear();
}
// We could potentially have several updates where a service or arrives, is modified, and then deleted, while
// we are busy. Need to handle them in the right order.
//
// Jobs are dependent on services but not the other way around - I think we need to handle services first,
// to avoid the case where something is dependent on something that will exist soon but doesn't currently.
handleNewServices (newServicesMap );
handleModifiedServices(modifiedServicesMap);
handleDeletedServices (deletedServicesMap );
handleNewJobs (newJobsMap );
handleModifiedJobs (modifiedJobsMap );
handleDeletedJobs (deletedJobsMap );
List<ServiceSet> regsvcs = serviceStateHandler.getServices();
for ( ServiceSet sset : regsvcs ) {
sset.enforceAutostart();
}
serviceManager.publish(serviceMap);
}
void signalUpdates( // This is the incoming or map, with work split into categories.
// The incoming maps are volatile - must save contents before returning.
HashMap<DuccId, IDuccWork> newJobs,
HashMap<DuccId, IDuccWork> newServices,
HashMap<DuccId, IDuccWork> deletedJobs,
HashMap<DuccId, IDuccWork> deletedServices,
HashMap<DuccId, IDuccWork> modifiedJobs,
HashMap<DuccId, IDuccWork> modifiedServices
)
{
synchronized(stateUpdateLock) {
this.newJobs.putAll(newJobs);
this.newServices.putAll(newServices);
this.deletedJobs.putAll(deletedJobs);
this.deletedServices.putAll(deletedServices);
this.modifiedJobs.putAll(modifiedJobs);
this.modifiedServices.putAll(modifiedServices);
}
synchronized(this) {
notify();
}
}
void runCommands()
{
String methodName = "runCommands";
LinkedList<ApiHandler> tmp = new LinkedList<ApiHandler>();
synchronized(pendingRequests) {
tmp.addAll(pendingRequests);
pendingRequests.clear();
}
logger.info(methodName, null, "Running", tmp.size(), "API Tasks.");
synchronized(this) {
for ( ApiHandler apih : tmp ) {
apih.run();
}
}
}
void addApiTask(ApiHandler apih)
{
synchronized(pendingRequests) {
pendingRequests.add(apih);
}
}
/**
* This is called when an endpoint is referenced as a dependent service from a job or a service.
* It is called only when a new job or service is first discovred in the OR map.
*/
protected Map<String, ServiceSet> resolveDependencies(DuccWorkJob w, ServiceDependency s)
{
//String methodName = "resolveDependencies";
DuccId id = w.getDuccId();
String[] deps = w.getServiceDependencies();
// New services, if any are discovered
boolean fatal = false;
Map<String, ServiceSet> jobServices = new HashMap<String, ServiceSet>();
for ( String dep : deps ) {
// put it into the global map of known services if needed and up the ref count
ServiceSet sset = serviceStateHandler.getServiceByUrl(dep);
if ( sset == null ) {
// Not good. Lets see if it's a terminating service so we can at least tell the poor guy.
sset = serviceStateHandler.getUnregisteredServiceByUrl(dep);
if ( sset == null ) {
// Still null, never h'oid of de guy
s.addMessage(dep, "Service is unknown.");
s.setState(ServiceState.NotAvailable);
} else {
// The service is deregistered but not yet purged, may as well tell him. It can
// take a while for these guys to go away.
s.addMessage(dep, "Service has been deregistered and is terminating.");
s.setState(ServiceState.NotAvailable);
}
fatal = true;
continue;
}
jobServices.put(dep, sset);
}
if ( fatal ) {
jobServices.clear();
} else {
for ( ServiceSet sset : jobServices.values() ) {
// If service is unregistered and then rerigistered while the job is running it may have lost
// its connections, which we insure we always have here.
serviceStateHandler.putServiceForJob(w.getDuccId(), sset);
sset.reference(id); // might start it if it's not running
}
}
return jobServices;
}
/**
* Resolves state for the job in id based on the what it is dependent upon - the independent services
*
* Enter this code ONLY if it is determined that the 'independent' work, 'id', does in fact have
* declared dependencies.
*
* @param id This is the ID of a job or service we want to work out the service state for
* @param dep This is the thing we send to OR telling it about the state of 'id'
*/
protected void resolveState(DuccId id, ServiceDependency dep)
{
Map<Long, ServiceSet> services = serviceStateHandler.getServicesForJob(id);
if ( services == null ) {
dep.setState(ServiceState.NotAvailable); // says that nothing i need is available
return;
}
ServiceState state = ServiceState.Available;
//
// Start with the most permissive state and reduce it as we walk the list
// Running > Initializing > Waiting > NotAvailable
//
// This sets the state to the min(all dependent service states)
//
for ( ServiceSet sset : services.values() ) {
if ( sset.getState().ordinality() < state.ordinality() ) state = sset.getState();
dep.setIndividualState(sset.getKey(), sset.getState());
if ( sset.excessiveFailures() ) {
dep.addMessage(sset.getKey(), sset.getErrorString());
}
// logger.debug(methodName, id, "Set individual state", sset.getState());
}
if ( state.ordinality() < 5 ) { // UIMA-4223, if we got this far, the services all exist but at least one of them
// is not usable. We use this slightly artificial state to insure the OR keeps
// the work WaitingForServices.
state = ServiceState.Pending;
}
dep.setState(state);
}
/**
* A job or service has ended. Here's common code to clean up the dependent services.
* @param id - the id of the job or service that stopped
* @param deps - the services that 'id' was dependent upon
*/
protected void stopDependentServices(DuccId id)
{
String methodName = "stopDependentServices";
Map<Long, ServiceSet> deps = serviceStateHandler.getServicesForJob(id);
if ( deps == null ) {
logger.info(methodName, id, "No dependent services to stop, returning.");
return; // service already deleted, timing issue
}
//
// Bop through all the things job 'id' is dependent upon, and update their refcounts. If
// the refs go to 0 we stop the pinger and sometimes the independent service itself.
//
for ( Long depid : deps.keySet() ) {
logger.debug(methodName, id, "Looking up service", depid);
ServiceSet sset = deps.get(depid);
if ( sset == null ) {
logger.error(methodName, id, "Internal error: Null service for " + depid); // sanity check, should never happen
continue;
}
sset.dereference(id); // also maybe stops the pinger
}
// last, indicate that job 'id' has nothing it's dependent upon any more
serviceStateHandler.removeServicesForJob(id);
}
protected void handleNewJobs(Map<DuccId, IDuccWork> work)
{
String methodName = "handleNewJobs";
// Map of updates to send to OR
HashMap<DuccId, ServiceDependency> updates = new HashMap<DuccId, ServiceDependency>();
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
if ( !w.isActive() ) {
logger.info(methodName, id, "Bypassing inactive job, state =", w.getStateObject());
continue;
}
ServiceDependency s = new ServiceDependency(); // for the OR
updates.put(id, s);
String[] deps = w.getServiceDependencies();
if ( deps == null ) { // no deps, just mark it running and move on
s.setState(ServiceState.Available);
logger.info(methodName, id, "Added to map, no service dependencies.");
continue;
}
Map<String, ServiceSet> jobServices = resolveDependencies(w, s);
for ( ServiceSet sset : jobServices.values() ) {
logger.info(methodName, id, "Job is dependent on", sset.getKey());
}
resolveState(id, s);
logger.info(methodName, id, "Added job to map, with service dependency state.", s.getState());
logger.info(methodName, id, s.getMessages());
}
serviceMap.putAll(updates);
}
protected void handleModifiedJobs(Map<DuccId, IDuccWork> work)
{
String methodName = "handleModifiedobs";
//
// Only look at active jobs. The others will be going away soon and we use
// that time as a grace period to keep the management machinery running in
// case more work comes in in the next few minutes.
//
// Everything is already in the service map so we just update the state.
//
for ( DuccId id : work.keySet() ) {
DuccWorkJob j = (DuccWorkJob) work.get(id);
String[] deps = j.getServiceDependencies();
if ( deps == null ) { // no deps, just mark it running and move on
logger.info(methodName, id, "No service dependencies, no updates made.");
continue;
}
ServiceDependency s = serviceMap.get(id);
if ( j.isFinished() ) {
stopDependentServices(id);
s.setState(ServiceState.NotAvailable);
s.clearMessages();
} else if ( j.isActive() ) {
resolveDependencies(j, s);
resolveState(id, s);
}
}
}
protected void handleDeletedJobs(Map<DuccId, IDuccWork> work)
{
String methodName = "handleDeletedobs";
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
String[] deps = w.getServiceDependencies();
if ( deps == null ) { // no deps, just mark it running and move on
logger.info(methodName, id, "No service dependencies, no updates made.");
continue;
}
stopDependentServices(id);
logger.info(methodName, id, "Deleted job from map");
}
serviceMap.removeAll(work.keySet());
}
protected void handleNewServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleNewServices";
Map<DuccId, ServiceDependency> updates = new HashMap<DuccId, ServiceDependency>(); // to be added to the service map sent to OR
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
//
// On restart we sometimes get stale stuff that we just ignore.
//
if ( !w.isActive() ) {
logger.info(methodName, id, "Bypassing inactive service, state=", w.getStateObject());
continue;
}
ServiceDependency s = new ServiceDependency();
updates.put(id, s);
String endpoint = w.getServiceEndpoint();
if ( endpoint == null ) { // the job is damaged if this happens
String msg = "No service endpoint. Service cannot be validated.";
logger.warn(methodName, id, msg);
s.addMessage("null", msg); // this is a fatal state always
s.setState(ServiceState.NotAvailable);
continue;
}
String[] deps = w.getServiceDependencies(); // other services this svc depends on
ServiceSet sset = serviceStateHandler.getServiceByImplementor(id.getFriendly());
if ( sset == null ) {
s.addMessage(endpoint, "No registered service for " + endpoint);
s.setState(ServiceState.NotAvailable);
continue;
}
//
// No deps. Put it in the map and move on.
//
if ( deps == null ) {
logger.info(methodName, id, "Added service to map, no service dependencies. ");
s.setState(ServiceState.Available); // good to go in the OR (the state of things i'm dependent upon)
sset.signalUpdate(w);
continue;
}
resolveDependencies(w, s); // check what I depend on and maybe kick 'em
resolveState(id, s); // get cumulative state based on my deps
sset.signalUpdate(w); // kick my own instance
logger.info(methodName, id, "Added to map, with service dependencies,", s.getState());
}
serviceMap.putAll(updates); // for return to OR
}
/**
* The assumption here is that we already had the service instance in our map, and OR is
* delivering an update. That means the instance was known to us in the past, it is not new.
*/
protected void handleModifiedServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleModifiedServices";
//
// This is a specific service process, but not necessarily the whole service.
//
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
String url = w.getServiceEndpoint();
IDuccProcessMap pm = w.getProcessMap();
String node = "<unknown>";
Long share_id = -1L;
if ( pm.size() > 1 ) {
logger.warn(methodName, id, "Process map is too large, should be size 1. Size:", pm.size(), "Cannot determine node or share_id for service.");
} else if ( pm.size() < 1 ) {
logger.warn(methodName, id, "Process map is empty but we are expecting exactly one entry. Cannot determine node or share id for service.");
} else {
for ( DuccId pid : pm.keySet() ) {
NodeIdentity ni = pm.get(pid).getNodeIdentity();
node = ni.getName();
share_id = pid.getFriendly();
}
}
if (url == null ) { // probably impossible but lets not chance NPE
logger.warn(methodName, id, "Missing service endpoint/url, ignoring.");
continue;
}
ServiceSet sset = serviceStateHandler.getServiceByImplementor(id.getFriendly());
if ( sset == null ) {
sset = serviceStateHandler.getUnregisteredServiceByUrl(url);
if ( sset == null ) {
// leftover junk publication maybe? can't tell
logger.info(methodName, id, "Update for active service instance", id.toString(),
"but have no registration for it. Job state:", w.getJobState());
continue;
}
logger.info(methodName, id, "Update for unregistered service, continuing shutdown of service. Job State:", w.getJobState());
}
if ( !sset.containsImplementor(id) ) {
if ( !sset.canDeleteInstance(w) ) {
// the instance isn't dead, this is a possible problem
logger.warn(methodName, id, "sset for", sset.getId(), "does not contain instance");
}
continue; // we don't care any more, he's gone
}
if ( share_id != -1 ) {
sset.updateInstance(id.getFriendly(), share_id, node);
}
ServiceDependency s = serviceMap.get(id);
if ( w.isFinished() ) { // nothing more, just dereference and maybe stop stuff I'm dependent upon
// state Completing or Completed
stopDependentServices(id);
s.setState(ServiceState.NotAvailable); // tell orchestrator
} else if ( w.getServiceDependencies() != null ) { // update state from things I'm dependent upon
resolveDependencies(w, s);
resolveState(id, s);
}
sset.signalUpdate(w);
}
}
protected void handleDeletedServices(Map<DuccId, IDuccWork> work)
{
String methodName = "handleDeletedServices";
for ( DuccId id : work.keySet() ) {
DuccWorkJob w = (DuccWorkJob) work.get(id);
String url = w.getServiceEndpoint();
logger.info(methodName, id, "Instance deleted for", url);
if (url == null ) { // probably impossible but lets not chance NPE
logger.warn(methodName, id, "Missing service endpoint, ignoring.");
continue;
}
//
// Dereference and maybe stop the services I'm dependent upon
//
if ( w.getServiceDependencies() == null ) {
logger.info(methodName, id, "No service dependencies to update on removal.");
} else {
stopDependentServices(id); // update references, remove implicit services if any
}
ServiceSet sset = serviceStateHandler.getServiceByImplementor(id.getFriendly());
if ( sset != null ) { // can happen on unregister
sset.signalUpdate(w);
}
}
serviceMap.removeAll(work.keySet()); // and finally the deleted services
}
/**
* Add in the service dependencies to the query.
*/
void updateServiceQuery(IServiceDescription sd, ServiceSet sset)
{
//
// The thing may not be running yet / at-all. Pull out the deps from the registration and
// query them individually.
//
String[] deps = sset.getIndependentServices();
if ( deps != null ) {
for ( String dep : deps ) {
ServiceSet independent = serviceStateHandler.getServiceByUrl(dep);
if ( independent != null ) {
sd.addDependency(dep, independent.getState().decode());
} else {
sd.addDependency(dep, ServiceState.Stopped.decode());
}
}
}
}
synchronized ServiceReplyEvent query(ServiceQueryEvent ev) // UIMA-4336 Redeclare return type
{
//String methodName = "query";
long id = ev.getFriendly();
String url = ev.getEndpoint();
ServiceQueryReplyEvent reply = new ServiceQueryReplyEvent();
if (( id == -1) && ( url == null )) {
for ( ServiceSet sset : serviceStateHandler.getServices()) {
IServiceDescription sd = sset.query();
updateServiceQuery(sd, sset);
reply.addService(sd);
reply.setReturnCode(true);
}
} else {
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
reply.setEndpoint(url);
reply.setId(id);
if ( sset == null ) {
reply.setMessage("Unknown service");
reply.setEndpoint(url);
reply.setReturnCode(false);
} else {
IServiceDescription sd = sset.query();
updateServiceQuery(sd, sset);
reply.addService(sd);
reply.setReturnCode(true);
}
}
return reply;
}
boolean authorized(String operation, ServiceSet sset, AServiceRequest req)
{
String methodName = "authorized";
String userin = req.getUser();
String userout = sset.getUser();
if ( userin.equals(userout) ) { // owner is always authorized
logger.info(methodName, sset.getId(), operation, "request from", userin, "allowed.");
return true;
}
if ( serviceManager.isAdministrator(req) ) { // global admin is always authorized
logger.info(methodName, sset.getId(), operation, "request from", userin, "allowed as DUCC administrator. Service owner:", userout);
return true;
}
if ( sset.isAuthorized(userin) ) { // registered co-owner is always authorized
logger.info(methodName, sset.getId(), operation, "request from", userin, "alloed as co-ownder. Service owner:", userout);
return true;
}
logger.info(methodName, sset.getId(), operation, "request from", userin, "not authorized. Service owner:", userout);
return false;
}
synchronized ServiceReplyEvent start(ServiceStartEvent ev)
{
// String methodName = "start";
long id = ev.getFriendly();
String url = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
return ServiceManagerComponent.makeResponse(false, "Unknown service", url, id);
}
if ( ! authorized("start", sset, ev) ) {
return ServiceManagerComponent.makeResponse(false, "Owned by " + sset.getUser(), url, sset.getId().getFriendly());
}
int running = sset.countImplementors();
int instances = ev.getInstances();
if ( (instances == -1) && !sset.enabled() ) { // no args always enables
sset.enable();
} else if ( ! sset.enabled() ) {
return ServiceManagerComponent.makeResponse(false, "Service is disabled, cannot start (" + sset.getDisableReason() + ")", url, sset.getId().getFriendly());
}
if ( sset.isDebug() ) {
if ( sset.countImplementors() > 0 ) {
return ServiceManagerComponent.makeResponse(true,
"Already has instances[" + running + "] and service has process_debug set - no additional instances started",
sset.getKey(),
sset.getId().getFriendly());
}
}
int registered = sset.getNInstancesRegistered();
int wanted = 0;
if ( instances == -1 ) {
wanted = Math.max(0, registered - running);
} else {
wanted = instances;
}
if ( wanted == 0 ) {
return ServiceManagerComponent.makeResponse(true,
"Already has instances[" + running + "] - no additional instances started",
sset.getKey(),
sset.getId().getFriendly());
}
pendingRequests.add(new ApiHandler(ev, this));
if ( sset.isDebug() && (wanted > 1) ) {
return ServiceManagerComponent.makeResponse(true,
"Instances adjusted to [1] because process_debug is set",
sset.getKey(),
sset.getId().getFriendly());
} else {
return ServiceManagerComponent.makeResponse(true,
"New instances[" + wanted + "]",
sset.getKey(),
sset.getId().getFriendly());
}
}
//
// Everything to do this must be vetted before it is called
//
// Start with no instance says: start enough new processes to get up the registered amount
// Start with some instances says: start exactly this many
// If the --save option is included, also update the registration
//
void doStart(ServiceStartEvent ev)
{
String methodName = "doStart";
long friendly = ev.getFriendly();
String epname = ev.getEndpoint();
int instances = ev.getInstances();
ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
int running = sset.countImplementors();
int registered = sset.getNInstancesRegistered();
int wanted = 0;
if ( sset.isDebug() ) {
if ( sset.countImplementors() > 0 ) {
logger.warn(methodName, sset.getId(), "Not starting additional instances because process_debug is set.");
return;
}
if ( instances > 1 ) {
logger.warn(methodName, sset.getId(), "Adjusting instances to [1] because process_debug is set.");
instances = 1;
}
}
if ( instances == -1 ) {
wanted = Math.max(0, registered - running);
} else {
wanted = instances;
}
sset.resetRuntimeErrors();
sset.setStarted(); // manual start overrides, if there's still a problem
sset.updateInstances(running + wanted); // pass in target instances
}
synchronized ServiceReplyEvent stop(ServiceStopEvent ev)
{
String methodName = "stop";
long id = ev.getFriendly();
String url = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
return ServiceManagerComponent.makeResponse(false, "Unknown service", url, id);
}
if ( ! authorized("stop", sset, ev) ) {
return ServiceManagerComponent.makeResponse(false, "Owned by " + sset.getUser(), url, sset.getId().getFriendly());
}
if ( sset.isStopped() ) {
return ServiceManagerComponent.makeResponse(false, "Already stopped", sset.getKey(), sset.getId().getFriendly());
}
int running = sset.countImplementors();
int instances = ev.getInstances();
int tolose;
String msg;
// CLI/API prevents instances < -1
if ( instances == -1 ) { // figure out n to lose
tolose = running;
msg = "Stopping all deployments.";
} else {
tolose = Math.min(instances, running);
msg = "Stopping " + tolose + " deployments.";
}
logger.info(methodName, sset.getId(), msg);
pendingRequests.add(new ApiHandler(ev, this));
return ServiceManagerComponent.makeResponse(true, msg, sset.getKey(), sset.getId().getFriendly());
}
//
// Everything to do this must be vetted before it is called
//
// If instances == 0 set stop the whole service
// Otherwise we just stop the number asked for
// If --save is insicated we update the registry
//
void doStop(ServiceStopEvent event)
//long id, String url, int instances)
{
//String methodName = "doStop";
int instances = event.getInstances();
long id = event.getFriendly();
String url = event.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
int running = sset.countImplementors();
int tolose;
// CLI/API prevents instances < -1
if ( instances == -1 ) { // figure out n to lose
sset.disableAndStop("Disabled by stop from id " + event.getUser());
} else {
tolose = Math.min(instances, running);
sset.updateInstances(Math.max(0, running - tolose)); // pass in target intances running
}
}
synchronized ServiceReplyEvent disable(ServiceDisableEvent ev)
{
String methodName = "disable";
long id = ev.getFriendly();
String url = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
return ServiceManagerComponent.makeResponse(false, "Unknown service", url, id);
}
if ( ! authorized("disable", sset, ev) ) {
return ServiceManagerComponent.makeResponse(false, "Owned by " + sset.getUser(), url, sset.getId().getFriendly());
}
if ( !sset.enabled() ) {
return ServiceManagerComponent.makeResponse(true, "Service is already disabled", sset.getKey(), sset.getId().getFriendly());
}
sset.disable("Disabled by owner or administrator " + ev.getUser());
try {
sset.updateMetaProperties();
} catch ( Exception e ) {
logger.warn(methodName, sset.getId(), "Error updating meta properties:", e);
}
return ServiceManagerComponent.makeResponse(true, "Disabled", sset.getKey(), sset.getId().getFriendly());
}
synchronized ServiceReplyEvent enable(ServiceEnableEvent ev)
{
String methodName = "enable";
long id = ev.getFriendly();
String url = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
return ServiceManagerComponent.makeResponse(false, "Unknown service", url, id);
}
if ( ! authorized("enable", sset, ev) ) {
return ServiceManagerComponent.makeResponse(false, "Owned by " + sset.getUser(), url, sset.getId().getFriendly());
}
if ( sset.enabled() ) {
return ServiceManagerComponent.makeResponse(true, "Service is already enabled", sset.getKey(), sset.getId().getFriendly());
}
sset.enable();
try {
sset.updateMetaProperties();
} catch ( Exception e ) {
logger.warn(methodName, sset.getId(), "Error updating meta properties:", e);
}
return ServiceManagerComponent.makeResponse(true, "Enabled.", sset.getKey(), sset.getId().getFriendly());
}
synchronized ServiceReplyEvent ignore(ServiceIgnoreEvent ev)
{
long id = ev.getFriendly();
String url = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
return ServiceManagerComponent.makeResponse(false, "Unknown service", url, id);
}
if ( ! authorized("ignore", sset, ev) ) {
return ServiceManagerComponent.makeResponse(false, "Owned by " + sset.getUser(), url, sset.getId().getFriendly());
}
if ( sset.isAutostart() ) {
return ServiceManagerComponent.makeResponse(false, "Service is autostarted, ignore-references not applied.", sset.getKey(), sset.getId().getFriendly());
}
if ( !sset.isReferencedStart() ) {
return ServiceManagerComponent.makeResponse(true, "Service is already ignoring references", sset.getKey(), sset.getId().getFriendly());
}
if ( sset.countImplementors() == 0 ) {
return ServiceManagerComponent.makeResponse(false, "Cannot ignore references, service is not running.", sset.getKey(), sset.getId().getFriendly());
}
sset.ignoreReferences();
return ServiceManagerComponent.makeResponse(true, "References now being ignored.", sset.getKey(), sset.getId().getFriendly());
}
synchronized ServiceReplyEvent observe(ServiceObserveEvent ev)
{
long id = ev.getFriendly();
String url = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
return ServiceManagerComponent.makeResponse(false, "Unknown service", url, id);
}
if ( ! authorized("observe", sset, ev) ) {
return ServiceManagerComponent.makeResponse(false, "Owned by " + sset.getUser(), url, sset.getId().getFriendly());
}
if ( sset.isAutostart() ) {
return ServiceManagerComponent.makeResponse(false, "Must set autostart off before enabling reference-starts.", sset.getKey(), sset.getId().getFriendly());
}
if ( sset.countImplementors() == 0 ) {
return ServiceManagerComponent.makeResponse(false, "Cannot observe references, service is not running.", sset.getKey(), sset.getId().getFriendly());
}
sset.observeReferences();
return ServiceManagerComponent.makeResponse(true, "Observing references.", sset.getKey(), sset.getId().getFriendly());
}
synchronized ServiceReplyEvent register(DuccId id, DuccProperties props, DuccProperties meta, boolean isRecovered)
{
String methodName = "register";
String error = null;
boolean must_deregister = false;
String url = meta.getProperty("endpoint");
ServiceSet sset = serviceStateHandler.getServiceByUrl(url);
if (sset != null ) {
error = "Duplicate registered by " + sset.getUser();
return ServiceManagerComponent.makeResponse(false, error, url, sset.getId().getFriendly());
}
try {
sset = new ServiceSet(this, this.stateHandler, id, props, meta);
} catch (Throwable t) {
// throws because endpoint is not parsable
error = t.getMessage();
return ServiceManagerComponent.makeResponse(false, error, url, id.getFriendly());
}
try {
// if it's a "fresh" reservation it must go into the db. otherwise it is already
// in the db and doesn't need to be inserted
sset.storeProperties(isRecovered);
} catch ( Exception e ) {
error = ("Internal error; unable to store service descriptor. " + url);
logger.error(methodName, id, e);
}
// must check for cycles or we can deadlock
if ( ! must_deregister ) {
// TODO R2, revive the cycle checker
// CycleChecker cc = new CycleChecker(sset);
// if ( cc.hasCycle() ) {
// error = ("Service dependencies contain a cycle with " + cc.getCycles());
// logger.error(methodName, id, error);
// must_deregister = true;
// }
}
if ( error == null ) {
serviceStateHandler.registerService(id.getFriendly(), url, sset);
return ServiceManagerComponent.makeResponse(true, "Registered", url, id.getFriendly());
} else {
return ServiceManagerComponent.makeResponse(false, error, url, id.getFriendly());
}
}
synchronized ServiceReplyEvent modify(ServiceModifyEvent ev)
{
long id = ev.getFriendly();
String url = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
return ServiceManagerComponent.makeResponse(false, "Unknown service", url, id);
}
if ( ! authorized("modify", sset, ev) ) {
return ServiceManagerComponent.makeResponse(false, "Owned by " + sset.getUser(), url, sset.getId().getFriendly());
}
pendingRequests.add(new ApiHandler(ev, this));
return ServiceManagerComponent.makeResponse(true, "Modify accepted:", sset.getKey(), sset.getId().getFriendly());
}
boolean restart_pinger = false;
boolean restart_service = false;
void modifyRegistration(ServiceSet sset, UiOption option, String value)
{
int intval = 0;
boolean boolval = false;
// TODO: this case covers ALL service options, but note that only those in the modify list
// in the CLI are actually used. Eventually we will cover them all.
switch ( option ) {
case Instances:
intval = Integer.parseInt(value);
sset.updateRegisteredInstances(intval);
break;
case Autostart:
boolval = Boolean.parseBoolean(value);
sset.setAutostart(boolval);
break;
case Administrators:
sset.setJobProperty(option.pname(), value);
sset.parseAdministrators(value);
break;
// For the moment, these all update the registration but don't change internal
// operation.
case Description:
case LogDirectory:
case Jvm:
case ProcessJvmArgs:
case Classpath:
case SchedulingClass:
case Environment:
case ProcessMemorySize:
case ProcessExecutable:
case ProcessExecutableArgs:
case ServiceDependency:
case ProcessInitializationTimeMax:
case WorkingDirectory:
sset.setJobProperty(option.pname(), value);
break;
case InstanceInitFailureLimit:
sset.updateInitFailureLimit(value);
sset.setJobProperty(option.pname(), value);
break;
case ServiceLinger:
sset.updateLinger(value);
sset.setJobProperty(option.pname(), value);
break;
case ProcessDebug:
// Note this guy updates the props differently based on the value
sset.updateDebug(value); // value may be numeric, or "off"
break;
case ServicePingArguments:
case ServicePingClasspath:
case ServicePingJvmArgs:
case ServicePingTimeout:
case ServicePingDoLog:
case ServicePingClass:
case InstanceFailureWindow:
case InstanceFailureLimit:
if ( value.equals("default") ) {
sset.deleteJobProperty(option.pname());
} else {
sset.setJobProperty(option.pname(), value);
}
restart_pinger = true;
break;
default:
// In case a deprecated option such as classpath_order slips through
break;
}
}
//void doModify(long id, String url, int instances, Trinary autostart, boolean activate)
void doModify(ServiceModifyEvent sme)
{
String methodName = "doModify";
long id = sme.getFriendly();
String url = sme.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
DuccProperties mods = sme.getProperties();
restart_pinger = false;
restart_service = false;
boolean updateMeta = false;
Set<String> keys = mods.stringPropertyNames();
for (String kk : keys ) {
UiOption k = optionMap.get(kk);
if ( k == null ) {
logger.debug(methodName, sset.getId(), "Bypass property", kk);
continue;
}
switch ( k ) {
case Help:
case Debug:
case Modify:
// used by CLI only, won't even be passed in
continue;
case Autostart:
updateMeta = true; // UIMA-4928 (Should move it to the svc props)
default:
}
String v = (String) mods.get(kk);
try {
modifyRegistration(sset, k, v);
} catch ( Throwable t ) {
logger.error(methodName, sset.getId(), "Modify", kk, "to", v, "Failed:", t);
continue;
}
logger.info(methodName, sset.getId(), "Modify", kk, "to", v, "restart_service[" + restart_service + "]", "restart_pinger[" + restart_pinger + "]");
}
sset.resetRuntimeErrors();
try {
sset.updateSvcProperties();
if (updateMeta) {
sset.updateMetaProperties();
}
} catch (Exception e) {
logger.error(methodName, sset.getId(), "Cannot store properties:", e);
}
if ( restart_pinger ) {
sset.restartPinger();
restart_pinger = false;
}
// restart_service - not yet
}
synchronized ServiceReplyEvent unregister(ServiceUnregisterEvent ev)
{
//String methodName = "unregister";
long id = ev.getFriendly();
String url = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getServiceForApi(id, url);
if ( sset == null ) {
return ServiceManagerComponent.makeResponse(false, "Unknown service", url, id);
}
id = sset.getId().getFriendly(); // must insure the ev has the numeric id because we work entirely with that from now ow
url = sset.getKey(); // also insure url is there for messages
ev.setEndpoint(url);
ev.setFriendly(id);
if ( ! authorized("unregister", sset, ev) ) {
return ServiceManagerComponent.makeResponse(false, "Owned by " + sset.getUser(), url, sset.getId().getFriendly());
}
serviceStateHandler.unregister(sset);
sset.deregister(); // just sets a flag so we know how to handle it when it starts to die
pendingRequests.add(new ApiHandler(ev, this));
return ServiceManagerComponent.makeResponse(true, "Shutting down implementors", sset.getKey(), sset.getId().getFriendly());
}
//
// Everything to do this must be vetted before it is called. Run in a new thread to not hold up the API.
//
void doUnregister(ServiceUnregisterEvent ev)
{
String methodName = "doUnregister";
long friendly = ev.getFriendly();
String url = ev.getEndpoint();
ServiceSet sset = serviceStateHandler.getUnregisteredService(friendly);
if ( sset == null ) {
logger.error(methodName, null, "Service", friendly, "(" + url + ") is not a known, unregistereed service. No action taken.");
return;
}
sset.disableAndStop("Disabled by unregister from id " + ev.getUser());
if ( sset.isPingOnly() ) {
logger.info(methodName, sset.getId(), "Unregister ping-only setvice:", friendly, url);
serviceStateHandler.removeService(sset);
try {
sset.deleteProperties();
} catch (Exception e) {
logger.error(methodName, sset.getId(), "Cannot delete service from DB:", e);
}
} else if ( sset.countImplementors() > 0 ) {
logger.debug(methodName, sset.getId(), "Stopping implementors:", friendly, url);
} else {
logger.debug(methodName, sset.getId(), "Removing from map:", friendly, url);
sset.clearQueue(); // will call removeServices if everything looks ok
}
}
void addInstance(ServiceSet sset, ServiceInstance inst)
{
serviceStateHandler.addImplementorFor(sset, inst);
}
void removeImplementor(ServiceSet sset, ServiceInstance inst)
{
serviceStateHandler.removeImplementorFor(sset, inst);
}
void removeService(ServiceSet sset)
{
String methodName = "deleteService";
if ( serviceStateHandler.hasService(sset.getId()) ) {
logger.error(methodName, sset.getId(), "Attempt to delete service while it is still registered: refused.");
} else {
serviceStateHandler.removeService(sset);
}
}
/**
* From: http://en.wikipedia.org/wiki/Topological_sorting
*
* L Empty list that will contain the sorted elements
* S Set of all nodes with no incoming edges
* while S is non-empty do
* remove a node n from S
* insert n into L
* for each node m with an edge e from n to m do
* remove edge e from the graph
* if m has no other incoming edges then
* insert m into S
* if graph has edges then
* return error (graph has at least one cycle)
* else
* return L (a topologically sorted order)
*/
// class CycleChecker
// {
// ServiceSet sset;
// int edges = 0;
// List<String> cycles = null;
// CycleChecker(ServiceSet sset)
// {
// this.sset = sset;
// }
// boolean hasCycle()
// {
// // Start by building the dependency graph
// // TODO: Maybe consider saving this. Not clear there's much of a
// // gain doing the extra bookeeping beause the graphs will always
// // be small and will only need checking on registration or arrival
// // of a submitted service. So this cycle checking is always
// // fast anyway.
// //
// // Bookeeping could be a bit ugly because a submitted service could
// // bop in and change some dependency graph. We really only care
// // for checking cycles, so we'll check the cycles as things change
// // and then forget about it.
// //
// String[] deps = sset.getIndependentServices();
// if ( deps == null ) return false; // man, that was fast!
// Map<String, ServiceSet> visited = new HashMap<String, ServiceSet>(); // all the nodes in the graph
// clearEdges(sset, visited);
// List<ServiceSet> nodes = new ArrayList<ServiceSet>();
// nodes.addAll(visited.values());
// buildGraph(nodes);
// List<ServiceSet> sorted = new ArrayList<ServiceSet>(); // topo-sorted list of nodes
// List<ServiceSet> current = new ArrayList<ServiceSet>(); // nodes with no incoming edges
// // Constant: current has all nodes with no incoming edges
// for ( ServiceSet node : nodes ) {
// if ( ! node.hasPredecessor() ) current.add(node);
// }
// while ( current.size() > 0 ) {
// ServiceSet next = current.remove(0); // remove a node n from S
// sorted.add(next); // insert n int L
// List<ServiceSet> successors = next.getSuccessors();
// for ( ServiceSet succ : successors ) { // for each node m(pred) with an edge e from n to m do
// next.removeSuccessor(succ); // remove edge from graph
// succ.removePredecessor(next); // ...
// edges--;
// if ( !succ.hasPredecessor() ) current.add(succ); // if m(pred) has no incoming edges insert m into S
// }
// }
// if ( edges == 0 ) return false; // if graph has no edges, no cycles
// cycles = new ArrayList<String>(); // oops, and here they are
// for ( ServiceSet node : nodes ) {
// if ( node.hasSuccessor() ) {
// for ( ServiceSet succ : node.getSuccessors() ) {
// cycles.add(node.getKey() + " -> " + succ.getKey());
// }
// }
// }
// return true;
// }
// String getCycles()
// {
// return cycles.toString();
// }
// //
// // Traveerse the graph and make sure all the nodes are "clean"
// //
// void clearEdges(ServiceSet node, Map<String, ServiceSet> visited)
// {
// String key = node.getKey();
// node.clearEdges();
// if ( visited.containsKey(key) ) return;
// visited.put(node.getKey(), node);
// String[] deps = node.getIndependentServices();
// if ( deps == null ) return;
// for ( String dep : deps ) {
// ServiceSet sset = serviceStateHandler.getServiceByName(dep);
// if ( sset != null ) {
// clearEdges(sset, visited);
// }
// }
// }
// void buildGraph(List<ServiceSet> nodes)
// {
// for ( ServiceSet node : nodes ) {
// String[] deps = node.getIndependentServices(); // never null if we get this far
// if ( deps != null ) {
// for ( String d : deps ) {
// ServiceSet outgoing = serviceStateHandler.getServiceByName(d);
// if ( outgoing == null ) continue;
// outgoing.setIncoming(node);
// node.setOutgoing(outgoing);
// edges++;
// }
// }
// }
// }
// }
/**
* This is the shutdown hook that stops all the pingers.
*/
class ServiceShutdown
extends Thread
{
ServiceShutdown()
{
System.out.println("Setting shutdown hook");
}
public void run()
{
System.out.println("Running shutdown hook");
List<ServiceSet> allServices = serviceStateHandler.getServices();
for (ServiceSet sset : allServices) {
sset.stopMonitor();
}
try {
stateHandler.shutdown();
} catch ( Exception e ) {
logger.warn("ServicShutdown.run", null, "Error closing database: ", e);
}
}
}
class ServiceStateHandler
{
// // Map of active service descriptors by endpoint. For UIMA services, key is the endpoint.
private Map<String, ServiceSet> registeredServicesByUrl = new HashMap<String, ServiceSet>();
private Map<Long, ServiceSet> registeredServicesById = new HashMap<Long, ServiceSet>();
private Map<Long, ServiceSet> unregisteredServicesById = new HashMap<Long, ServiceSet>();
private Map<String, ServiceSet> unregisteredServicesByUrl = new HashMap<String, ServiceSet>();
private Map<Long, ServiceSet> servicesByImplementor = new HashMap<Long, ServiceSet>();
// private Map<Long, ServiceSet> servicesByFriendly = new HashMap<Long, ServiceSet>();
// // For each job, the collection of services it is dependent upon
// // DUccId is a Job Id (or id for serice that has dependencies)
private Map<DuccId, Map<Long, ServiceSet>> servicesByJob = new HashMap<DuccId, Map<Long, ServiceSet>>();
// ServiceStateHandler()
// {
// }
// /**
// * Return a copy of the keys so we can fetch the services in an orderly manner.
// */
// synchronized ArrayList<String> getServiceNames()
// {
// ArrayList<String> answer = new ArrayList<String>();
// for ( String k : servicesByName.keySet() ) {
// answer.add(k);
// }
// return answer;
// }
synchronized void unregister(ServiceSet sset)
{
String methodName = "ServiceStateHandler.unregister";
String key = sset.getKey();
long fid = sset.getId().getFriendly();
logger.info(methodName, sset.getId(), "Removing", key, fid);
registeredServicesByUrl.remove(key);
registeredServicesById.remove(fid);
unregisteredServicesById.put(fid, sset);
unregisteredServicesByUrl.put(key, sset);
}
// synchronized ServiceSet getUnregisteredService(String url)
// {
// return unRegisteredServicesByUrl.get(url);
// }
synchronized boolean hasService(DuccId id)
{
String methodName = "ServiceStateHandler.hasService";
logger.info(methodName, null, "containsKey", id, registeredServicesById.containsKey(id.getFriendly()));
return registeredServicesById.containsKey(id.getFriendly());
}
synchronized void registerService(Long id, String ep, ServiceSet sset)
{
String methodName = "ServiceStateHandler.registerService";
logger.info(methodName, sset.getId(), "adding", ep, id);
registeredServicesByUrl.put(ep, sset);
registeredServicesById.put(id, sset);
}
synchronized ServiceSet getServiceByUrl(String n)
{
return registeredServicesByUrl.get(n);
}
synchronized List<ServiceSet> getServices()
{
ArrayList<ServiceSet> answer = new ArrayList<ServiceSet>();
for ( ServiceSet sset : registeredServicesByUrl.values() ) {
answer.add(sset);
}
return answer;
}
synchronized void addImplementorFor(ServiceSet sset, ServiceInstance inst)
{
servicesByImplementor.put(inst.getId(), sset);
}
synchronized ServiceSet getServiceByImplementor(long id)
{
return servicesByImplementor.get(id);
}
synchronized void removeImplementorFor(ServiceSet sset, ServiceInstance inst)
{
servicesByImplementor.remove(inst.getId());
}
// synchronized ServiceSet getServiceByFriendly(long id)
// {
// return servicesByFriendly.get( id );
// }
// API passes in a friendly (maybe) and an endpiont (maybe) but only one of these
// Here we look up the service by whatever was passed in.
synchronized ServiceSet getServiceForApi(long id, String n)
{
if ( n == null ) return registeredServicesById.get(id);
return registeredServicesByUrl.get(n);
}
synchronized ServiceSet getUnregisteredService(long id)
{
return unregisteredServicesById.get(id);
}
synchronized ServiceSet getUnregisteredServiceByUrl(String url)
{
return unregisteredServicesByUrl.get(url);
}
// synchronized void putServiceByName(String n, ServiceSet s)
// {
// servicesByName.put(n, s);
// DuccId id = s.getId();
// if ( id != null ) {
// servicesByFriendly.put(id.getFriendly(), s);
// }
// }
synchronized void removeService(ServiceSet sset)
{
String key = sset.getKey();
long id = sset.getId().getFriendly();
unregisteredServicesById.remove(id);
unregisteredServicesByUrl.remove(key);
// The registeredServices need to have been removed during unregister which is the only way
// to get rid of a service.
Long[] implids = sset.getImplementors();
for ( long l : implids ) {
servicesByImplementor.remove(l);
}
DuccId[] refids = sset.getReferences();
for ( DuccId rid : refids) {
servicesByJob.remove(rid);
}
}
// synchronized void removeService(long id)
// {
// ServiceSet sset = servicesByFriendly.remove(id);
// if ( sset != null ) {
// String key = sset.getKey();
// servicesByName.remove(key);
// }
// }
// synchronized void removeService(String n, long id)
// {
// if ( n == null ) removeService(id);
// else removeService(n);
// }
synchronized Map<Long, ServiceSet> getServicesForJob(DuccId id)
{
return servicesByJob.get(id);
}
synchronized void putServiceForJob(DuccId id, ServiceSet s)
{
Map<Long, ServiceSet> services = servicesByJob.get(id);
if ( services == null ) {
services = new HashMap<Long, ServiceSet>();
servicesByJob.put(id, services);
}
services.put(s.getId().getFriendly(), s);
}
synchronized void removeServicesForJob(DuccId id)
{
servicesByJob.remove(id);
}
// synchronized void recordNewServices(Map<String, ServiceSet> services)
// {
// servicesByName.putAll(services);
// }
}
// tester for the topo sorter
public static void main(String[] args)
{
}
}