| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.uima.ducc.sm; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.NavigableSet; |
| import java.util.Properties; |
| import java.util.UUID; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.uima.ducc.cli.IUiOptions.UiOption; |
| import org.apache.uima.ducc.common.NodeIdentity; |
| import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties; |
| import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName; |
| import org.apache.uima.ducc.common.component.AbstractDuccComponent; |
| import org.apache.uima.ducc.common.crypto.Crypto; |
| import org.apache.uima.ducc.common.db.DbHelper; |
| import org.apache.uima.ducc.common.head.DuccHead; |
| import org.apache.uima.ducc.common.head.IDuccHead; |
| import org.apache.uima.ducc.common.head.IDuccHead.DuccHeadTransition; |
| import org.apache.uima.ducc.common.main.DuccService; |
| import org.apache.uima.ducc.common.persistence.services.IStateServices; |
| import org.apache.uima.ducc.common.persistence.services.IStateServices.AccessMode; |
| import org.apache.uima.ducc.common.persistence.services.IStateServices.SvcMetaProps; |
| import org.apache.uima.ducc.common.persistence.services.StateServicesDirectory; |
| import org.apache.uima.ducc.common.persistence.services.StateServicesFactory; |
| import org.apache.uima.ducc.common.persistence.services.StateServicesSet; |
| import org.apache.uima.ducc.common.utils.DuccCollectionUtils; |
| import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference; |
| import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference; |
| import org.apache.uima.ducc.common.utils.DuccLogger; |
| import org.apache.uima.ducc.common.utils.DuccProperties; |
| import org.apache.uima.ducc.common.utils.IDuccLoggerComponents.Daemon; |
| import org.apache.uima.ducc.common.utils.MissingPropertyException; |
| import org.apache.uima.ducc.common.utils.SystemPropertyResolver; |
| import org.apache.uima.ducc.common.utils.Version; |
| import org.apache.uima.ducc.common.utils.id.DuccId; |
| import org.apache.uima.ducc.common.utils.id.IDuccIdFactory; |
| import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher; |
| import org.apache.uima.ducc.transport.event.AServiceRequest; |
| import org.apache.uima.ducc.transport.event.DaemonDuccEvent; |
| import org.apache.uima.ducc.transport.event.DuccEvent.EventType; |
| import org.apache.uima.ducc.transport.event.ServiceDisableEvent; |
| import org.apache.uima.ducc.transport.event.ServiceEnableEvent; |
| import org.apache.uima.ducc.transport.event.ServiceIgnoreEvent; |
| import org.apache.uima.ducc.transport.event.ServiceModifyEvent; |
| import org.apache.uima.ducc.transport.event.ServiceObserveEvent; |
| import org.apache.uima.ducc.transport.event.ServiceQueryEvent; |
| import org.apache.uima.ducc.transport.event.ServiceRegisterEvent; |
| import org.apache.uima.ducc.transport.event.ServiceReplyEvent; |
| import org.apache.uima.ducc.transport.event.ServiceStartEvent; |
| import org.apache.uima.ducc.transport.event.ServiceStopEvent; |
| import org.apache.uima.ducc.transport.event.ServiceUnregisterEvent; |
| import org.apache.uima.ducc.transport.event.SmHeartbeatDuccEvent; |
| import org.apache.uima.ducc.transport.event.SmStateDuccEvent; |
| import org.apache.uima.ducc.transport.event.common.DuccWorkJob; |
| import org.apache.uima.ducc.transport.event.common.DuccWorkMap; |
| import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType; |
| import org.apache.uima.ducc.transport.event.common.IDuccWork; |
| import org.apache.uima.ducc.transport.event.common.IDuccWorkMap; |
| import org.apache.uima.ducc.transport.event.common.IDuccWorkService; |
| import org.apache.uima.ducc.transport.event.sm.IService.Trinary; |
| import org.apache.uima.ducc.transport.event.sm.ServiceMap; |
| |
| |
| /** |
| * This is the logical "main". The framework instantiates it and calls the (inherited) start() method. |
| * Start() establishes the class in its own thread and fires up the Handler thread. From then on it |
| * is a conduit between the Handler and messages to/from the outside world. |
| */ |
| public class ServiceManagerComponent |
| extends AbstractDuccComponent |
| implements IServiceManager, |
| SmConstants, |
| Runnable |
| { |
| |
| /** |
| * |
| */ |
| private static DuccLogger logger = DuccLogger.getLogger(ServiceManagerComponent.class.getCanonicalName(), COMPONENT_NAME); |
| private static DuccId jobid = null; |
| |
| private IDuccWorkMap localMap = null; |
| |
| private DuccEventDispatcher eventDispatcher; |
| private String stateEndpoint; |
| private String stateChangeEndpoint; |
| |
| private ServiceHandler handler = null; |
| private IStateServices stateHandler = null; |
| |
| //HashMap<String, BaseUimaAsService> services = new HashMap<String, BaseUimaAsService>(); |
| |
| static int meta_ping_rate = 60000; // interval in ms to ping the service |
| static int meta_ping_stability = 5; // number of missed pings before we mark the service down |
| static int meta_ping_timeout = 500; // timeout on ping |
| static String default_ping_class; |
| |
| static int init_failure_max = 1; // total |
| static int failure_max = 5; // total in window |
| static int failure_window = 30; // window size in minutes |
| |
| private IDuccIdFactory idFactory = null; |
| |
| private boolean signature_required = true; |
| private boolean initialized = false; |
| private boolean testmode = false; |
| private boolean orchestrator_alive = false; |
| |
| private Map<String, String> administrators = new HashMap<String, String>(); |
| |
| private IDuccHead dh = null; |
| |
| // Local SM version |
| // 1.1.0 - reworked SM |
| // 1.1.3 - added shutdown hook, pinger last-use, pinger disable autostart |
| // 1.1.4 - dynamic mod of all registration parms. Add debug and max-init-time parms. |
| // 1.1.0 - resync with release, sigh. |
| // 2.0.0 - Update for new release. |
| private String version = "2.1.0"; |
| |
| public ServiceManagerComponent(CamelContext context) |
| { |
| super("ServiceManager", context); |
| this.localMap = new DuccWorkMap(); |
| handler = new ServiceHandler(this); |
| } |
| |
| public DuccLogger getLogger() |
| { |
| return logger; |
| } |
| |
| /* |
| * resume: this head node has become master |
| */ |
| private void resume(IDuccWorkMap dwm) { |
| String methodName = "resume"; |
| try { |
| logger.info(methodName, jobid, ""); |
| handler.init(); |
| init(); |
| this.localMap = dwm; |
| handler.resume(this.localMap); |
| } |
| catch(Exception e) { |
| logger.error(methodName, jobid, e); |
| } |
| } |
| |
| /* |
| * quiesce: this head node has become backup |
| */ |
| private void quiesce() { |
| String methodName = "quiesce"; |
| try { |
| logger.info(methodName, jobid, ""); |
| handler.quiesce(); |
| } |
| catch(Exception e) { |
| logger.error(methodName, jobid, e); |
| } |
| } |
| |
| /** |
| * Initialization tasks: |
| * - read all the service descriptors |
| * - ping them and update their state |
| */ |
| void init() |
| throws Exception |
| { |
| String methodName = "init"; |
| |
| // recover the registry |
| StateServicesDirectory all = stateHandler.getStateServicesDirectory(); |
| NavigableSet<Long> svcs = all.getDescendingKeySet(); |
| |
| for ( Long l : svcs ) { |
| StateServicesSet sss = all.get(l); |
| DuccProperties svcprops = sss.get(IStateServices.svc); |
| DuccProperties metaprops = sss.get(IStateServices.meta); |
| |
| int friendly = 0; |
| String uuid = ""; |
| try { |
| // these gets will throw if the requisite objects aren't found |
| friendly = metaprops.getIntProperty("numeric_id"); |
| uuid = metaprops.getStringProperty("uuid"); |
| } catch (MissingPropertyException e1) { |
| // Ugly, but shouldn't have to be fatal |
| logger.error(methodName, null, "Cannot restore DuccId for service", l, "Friendly id:", friendly, "uuid:", uuid); |
| continue; |
| } |
| |
| System.out.println("Meta id " + metaprops.get("meta_dbid")); |
| System.out.println("Svc id " + metaprops.get("svc_dbid")); |
| DuccId id = new DuccId(friendly); |
| id.setUUID(UUID.fromString(uuid)); |
| logger.debug(methodName, id, "Unique:", id.getUnique()); |
| |
| try { |
| handler.register(id, svcprops, metaprops, true); |
| } catch (IllegalStateException e ) { // happens on duplicate service |
| logger.error(methodName, id, e.getMessage()); // message has all I need. |
| } |
| |
| } |
| |
| idFactory = ServiceManagerHelper.getDuccIdFactory(); |
| |
| synchronized(this) { |
| initialized = true; |
| } |
| } |
| |
| // UIMA-4336 Construct the response as a beany thing. |
| static ServiceReplyEvent makeResponse(boolean rc, String message, String endpoint, long id) |
| { |
| ServiceReplyEvent ret = new ServiceReplyEvent(); |
| ret.setReturnCode(rc); |
| ret.setMessage(message); |
| ret.setEndpoint(endpoint); |
| ret.setId(id); |
| return ret; |
| } |
| |
| void readAdministrators() |
| { |
| String methodName = "readAdministrators"; |
| File adminfile = new File(System.getProperty("DUCC_HOME") + "/resources/ducc.administrators"); |
| if ( ! adminfile.exists() ) { |
| logger.info(methodName, null, "No ducc administrators found."); |
| return; |
| } |
| |
| Properties props = null; |
| try { |
| FileInputStream fis = new FileInputStream(adminfile); |
| props = new Properties(); |
| props.load(fis); |
| } catch (Exception e) { |
| logger.warn(methodName, null, "Cannot read administroators file:", e.toString()); |
| return; |
| } |
| |
| for ( Object k : props.keySet() ) { |
| String adm = ((String) k).trim(); |
| administrators.put(adm, adm); |
| logger.info(methodName, null, "DUCC Administrator registered:", adm); |
| } |
| } |
| |
| /** |
| * Tell Orchestrator about state change for recording into system-events.log |
| */ |
| private void stateChange(EventType eventType) { |
| String methodName = "stateChange"; |
| try { |
| Daemon daemon = Daemon.ServicesManager; |
| NodeIdentity nodeIdentity = new NodeIdentity(); |
| DaemonDuccEvent ev = new DaemonDuccEvent(daemon, eventType, nodeIdentity); |
| eventDispatcher.dispatch(stateChangeEndpoint, ev, ""); |
| logger.info(methodName, null, stateChangeEndpoint, eventType.name(), nodeIdentity.getCanonicalName()); |
| } |
| catch(Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } |
| |
| @Override |
| public void start(DuccService service, String[] args) throws Exception |
| { |
| String methodName = "start"; |
| super.start(service, args); |
| DuccDaemonRuntimeProperties.getInstance().boot(DaemonName.ServiceManager,getProcessJmxUrl()); |
| |
| init_failure_max = SystemPropertyResolver.getIntProperty("ducc.sm.init.failure.limit" , init_failure_max); |
| failure_max = SystemPropertyResolver.getIntProperty("ducc.sm.instance.failure.limit" , failure_max); |
| failure_window = SystemPropertyResolver.getIntProperty("ducc.sm.instance.failure.window" , failure_window); |
| |
| meta_ping_rate = SystemPropertyResolver.getIntProperty("ducc.sm.meta.ping.rate" , meta_ping_rate); |
| meta_ping_timeout = SystemPropertyResolver.getIntProperty("ducc.sm.meta.ping.timeout" , meta_ping_timeout); |
| meta_ping_stability = SystemPropertyResolver.getIntProperty("ducc.sm.meta.ping.stability" , meta_ping_stability); |
| default_ping_class = SystemPropertyResolver.getStringProperty("ducc.sm.default.monitor.class", UimaAsPing.class.getCanonicalName()); |
| |
| String rm = SystemPropertyResolver.getStringProperty("ducc.runmode", ""); |
| if ( rm.equals("Test") ) testmode = true; |
| |
| String sig = SystemPropertyResolver.getStringProperty("ducc.signature.required", "on"); |
| signature_required = true; |
| if ( sig.equals("on") ) signature_required = true; |
| else if ( sig.equals("off") ) signature_required = false; |
| else { |
| logger.warn(methodName, null, "Incorrect value for property ducc.signature.required: " + sig + ". Setting to default of \"on\""); |
| } |
| |
| logger.info(methodName, null, "---------------------------- NEW -----------------------------------------------------"); |
| logger.info(methodName, null, "Service Manager starting:"); |
| logger.info(methodName, null, " DUCC home : ", System.getProperty("DUCC_HOME")); |
| logger.info(methodName, null, " ActiveMQ URL : ", System.getProperty("ducc.broker.url")); |
| logger.info(methodName, null, ""); |
| logger.info(methodName, null, " JVM : ", System.getProperty("java.vendor") + |
| " "+ System.getProperty("java.version")); |
| logger.info(methodName, null, " JAVA_HOME : ", System.getProperty("java.home")); |
| logger.info(methodName, null, " JVM Path : ", System.getProperty("ducc.jvm")); |
| logger.info(methodName, null, " JMX URL : ", System.getProperty("ducc.jmx.url")); |
| logger.info(methodName, null, ""); |
| logger.info(methodName, null, " OS Architecture : ", System.getProperty("os.arch")); |
| logger.info(methodName, null, " Crypto enabled : ", signature_required); |
| logger.info(methodName, null, ""); |
| logger.info(methodName, null, " Test mode enabled : ", testmode); |
| logger.info(methodName, null, ""); |
| logger.info(methodName, null, " Service ping rate : ", meta_ping_rate); |
| logger.info(methodName, null, " Service ping timeout : ", meta_ping_timeout); |
| logger.info(methodName, null, " Service ping stability : ", meta_ping_stability); |
| logger.info(methodName, null, " Default ping class : ", default_ping_class); |
| logger.info(methodName, null, ""); |
| logger.info(methodName, null, " database enabled : ", DbHelper.isDbEnabled()); |
| logger.info(methodName, null, " database implementation : ", System.getProperty("ducc.service.persistence.impl")); |
| logger.info(methodName, null, ""); |
| logger.info(methodName, null, " Init Failure Max : ", init_failure_max); |
| logger.info(methodName, null, " Instance Failure Max : ", failure_max); |
| logger.info(methodName, null, " Instance Failure Window : ", failure_window); |
| logger.info(methodName, null, ""); |
| logger.info(methodName, null, " DUCC Version : ", Version.version()); |
| logger.info(methodName, null, " SM Version : ", version); |
| logger.info(methodName, null, "------------------------------------------------------------------------------------"); |
| |
| readAdministrators(); |
| |
| stateHandler = StateServicesFactory.getInstance(this.getClass().getCanonicalName(), COMPONENT_NAME); |
| |
| // // String dbname = System.getProperty("ducc.db.name"); |
| // String dburl = System.getProperty("ducc.state.database.url"); // "remote:localhost:2424/DuccState" |
| |
| // try { |
| // // verify, and possibly set up the schema if it's the first time |
| // databaseHandler = new DbManager(dburl); |
| // databaseHandler.init(); |
| // } catch (Throwable e) { |
| // logger.fatal(methodName, null, "Cannot create database at", dburl, ":", e); |
| // Runtime.getRuntime().halt(1); |
| // } |
| |
| // if ( databaseHandler == null ) { |
| // logger.error(methodName, null, "Cannot open database at", dburl); |
| // } else { |
| // logger.info(methodName, null, "Opened database at", dburl); |
| // } |
| handler.setStateHandler(stateHandler); |
| |
| // Here is a good place to do any pre-start stuff |
| |
| // Start the main processing loop |
| Thread smThread = new Thread(this); |
| smThread.setName("ServiceManagerHandler"); |
| smThread.setDaemon(true); |
| smThread.start(); |
| |
| Thread handlerThread = new Thread(handler); |
| handlerThread.setName("ServiceHandler"); |
| handlerThread.setDaemon(true); |
| handlerThread.start(); |
| stateChange(EventType.BOOT); |
| } |
| |
| public void stop() throws Exception { |
| stateChange(EventType.SHUTDOWN); |
| super.stop(); |
| } |
| |
| public void run() |
| { |
| String methodName = "run"; |
| |
| logger.info(methodName, null, "Starting Service Manager"); |
| try { |
| init(); |
| runSm(); |
| } catch ( Throwable t ) { |
| logger.error(methodName, null, t); |
| } |
| logger.info(methodName, null, "Service Manger returns."); |
| } |
| |
| public boolean isAdministrator(AServiceRequest ev) |
| { |
| // must be in the list, and have asked nicely as well |
| return administrators.containsKey(ev.getUser()) && (ev.asAdministrator()); |
| } |
| |
| /** |
| * At boot only ... pass in the set of all known active services to each service so it can update |
| * internal state with current published state. |
| */ |
| public synchronized void bootHandler(IDuccWorkMap work) |
| { |
| Map<DuccId, DuccWorkJob> services = new HashMap<DuccId, DuccWorkJob>(); |
| for ( Object o : work.values() ) { |
| IDuccWork w = (IDuccWork) o; |
| if ( w.getDuccType() != DuccType.Service ) continue; |
| DuccWorkJob j = (DuccWorkJob) w; |
| if ( !j.isActive() ) continue; |
| services.put(j.getDuccId(), j); |
| } |
| handler.bootImplementors(services); |
| } |
| |
| void diffCommon(IDuccWork l, IDuccWork r, HashMap<DuccId, IDuccWork> modifiedJobs, HashMap<DuccId, IDuccWork> modifiedServices) |
| { |
| String methodName = "diffCommon"; |
| if ( l.getDuccType() == DuccType.Reservation ) return; |
| |
| if ( l.getDuccType() == DuccType.Pop ) { |
| logger.trace(methodName, l.getDuccId(), "BOTH: GOT A POP:", l.getDuccId()); |
| } |
| |
| if ( l.getStateObject() != r.getStateObject() ) { |
| String serviceType = "/ Job"; |
| switch ( l.getDuccType() ) { |
| case Service: |
| case Pop: |
| switch ( ((IDuccWorkService)l).getServiceDeploymentType() ) |
| { |
| case uima: |
| case custom: |
| serviceType = "/ Service"; |
| break; |
| case other: |
| serviceType = "/ ManagedReservation"; |
| default: |
| break; |
| } |
| break; |
| default: |
| break; |
| } |
| logger.trace(methodName, l.getDuccId(), "Reconciling", l.getDuccType(), serviceType, "incoming state = ", l.getStateObject(), " my state = ", r.getStateObject()); |
| } |
| |
| // Update our own state by replacing the old (right) object with the new (left) |
| switch(l.getDuccType()) { |
| case Job: |
| modifiedJobs.put(l.getDuccId(), l); |
| localMap.addDuccWork(l); |
| break; |
| |
| case Service: |
| localMap.addDuccWork(l); |
| switch ( ((IDuccWorkService)l).getServiceDeploymentType() ) |
| { |
| case uima: |
| case custom: |
| modifiedServices.put(l.getDuccId(), l); |
| break; |
| case other: |
| modifiedJobs.put(l.getDuccId(), l); |
| default: |
| break; |
| } |
| break; |
| |
| default: |
| break; |
| } |
| } |
| |
| |
| /** |
| * Split the incoming work into new, deleted, and needs update. This runs under the |
| * incoming camel thread so don't do anything timeconsuming here. |
| * |
| * Also maintain the local workMap so we can diff. |
| * |
| * Runs on the incoming thread, do not do anything blocking or timecomsuming here. |
| */ |
| public synchronized void processIncoming(IDuccWorkMap workMap) |
| { |
| String methodName = "processIncoming"; |
| |
| |
| HashMap<DuccId, IDuccWork> newJobs = new HashMap<DuccId, IDuccWork>(); |
| HashMap<DuccId, IDuccWork> newServices = new HashMap<DuccId, IDuccWork>(); |
| |
| HashMap<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, IDuccWork>(); |
| HashMap<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, IDuccWork>(); |
| |
| HashMap<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, IDuccWork>(); |
| HashMap<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, IDuccWork>(); |
| |
| logger.info(methodName, null, "===== Orchestrator State Arrives ====="); |
| |
| // try { |
| // ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("/home/challngr/for/jerry/working/incomingWorkMap.obj")); |
| // oos.writeObject(workMap); |
| // oos.close(); |
| |
| // oos = new ObjectOutputStream(new FileOutputStream("/home/challngr/for/jerry/working/existingWorkMap.obj")); |
| // oos.writeObject(localMap); |
| // oos.close(); |
| // } catch ( Throwable t ) { |
| // logger.error(methodName, null, t); |
| // } |
| |
| @SuppressWarnings("unchecked") |
| DuccMapDifference<DuccId, IDuccWork> diffmap = DuccCollectionUtils.difference(workMap, localMap); |
| |
| for ( Object o : workMap.values() ) { |
| IDuccWork w = (IDuccWork) o; |
| logger.trace(methodName, w.getDuccId(), w.getDuccType(), "Arrives in state =", w.getStateObject()); |
| // if ( w.getDuccId().getFriendly() == 204 ) { |
| // int a = 1; |
| // a++; |
| // } |
| } |
| |
| // Stuff on the left is new |
| Map<DuccId, IDuccWork> work = diffmap.getLeft(); |
| for ( IDuccWork w : work.values() ) { |
| |
| logger.trace(methodName, w.getDuccId(), "Calculating diffs on left side.", w.getDuccId()); |
| if ( w.getDuccType() == DuccType.Reservation ) continue; |
| |
| if ( w.getDuccType() == DuccType.Pop ) { |
| logger.trace(methodName, w.getDuccId(), "NEW: GOT A POP:", w.getDuccId()); |
| } |
| |
| if ( !((DuccWorkJob)w).isActive() ) continue; // not active, we don't care about it. likely after restart. |
| |
| logger.trace(methodName, w.getDuccId(), "Reconciling, adding", w.getDuccType()); |
| switch(w.getDuccType()) { |
| case Job: |
| localMap.addDuccWork(w); |
| newJobs.put(w.getDuccId(), w); |
| break; |
| |
| case Service: |
| localMap.addDuccWork(w); |
| // An arbitrary process is **almost** the same as a service in terms of how most of DUCC |
| // handles it. To me (SM), however, it is just like any other job so it goes into |
| // the job map. |
| switch ( ((IDuccWorkService)w).getServiceDeploymentType() ) |
| { |
| case uima: |
| case custom: |
| newServices.put(w.getDuccId(), w); |
| break; |
| case other: |
| newJobs.put(w.getDuccId(), w); |
| default: |
| break; |
| } |
| |
| break; |
| |
| default: |
| break; |
| } |
| } |
| |
| // Stuff on the right is stuff we have but OR doesn't |
| work = diffmap.getRight(); |
| for ( IDuccWork w : work.values() ) { |
| logger.trace(methodName, w.getDuccId(), "Doing diffs on right"); |
| if ( w.getDuccType() == DuccType.Reservation ) continue; |
| |
| if ( w.getDuccType() == DuccType.Pop ) { |
| logger.trace(methodName, w.getDuccId(), "DELETED: GOT A POP:", w.getDuccId()); |
| } |
| |
| logger.debug(methodName, w.getDuccId(), "Reconciling, deleting instance of type ", w.getDuccType()); |
| switch(w.getDuccType()) { |
| case Job: |
| localMap.removeDuccWork(w.getDuccId()); |
| deletedJobs.put(w.getDuccId(), w); |
| break; |
| |
| case Service: |
| localMap.removeDuccWork(w.getDuccId()); |
| switch ( ((IDuccWorkService)w).getServiceDeploymentType() ) |
| { |
| case uima: |
| case custom: |
| deletedServices.put(w.getDuccId(), w); |
| break; |
| case other: |
| deletedJobs.put(w.getDuccId(), w); |
| default: |
| break; |
| } |
| break; |
| |
| default: |
| break; |
| } |
| } |
| |
| // NOTE: 2014-07-14 There is some sort of bug in the equals() method on DuccWork so it incorrectly |
| // identifies work as having difference when it doesn't. As a result this code was originnaly |
| // written under mistaken assumptions of what the map difference returns. Untkl the owner of |
| // the DuccWork object have worked out a correct equals(), we run the intersection on ALL |
| // intersecting objects, whether they differ in state or do not; hence the two |
| // loops below on the diffmap iterator and on diffmap.getCommon() |
| // |
| // |
| // Now: stuff we both know about. Here is stuff that is in both maps the the map diff identifies as |
| // having state differences. |
| // |
| for( DuccMapValueDifference<IDuccWork> jd: diffmap ) { |
| IDuccWork r = jd.getRight(); |
| IDuccWork l = jd.getLeft(); |
| |
| logger.trace(methodName, r.getDuccId(), "Doing diffs on middle A:", r.getDuccId(), l.getDuccId()); |
| |
| diffCommon(l, r, modifiedJobs, modifiedServices); |
| } |
| |
| // |
| // Common stuff - in both maps the the state diff identifies as haveing no state differences. |
| // |
| work = diffmap.getCommon(); |
| for( DuccId k : work.keySet()) { |
| IDuccWork r = (IDuccWork) localMap.get(k); |
| IDuccWork l = (IDuccWork) workMap.get(k); |
| |
| logger.trace(methodName, r.getDuccId(), "Doing diffs on middle B:", r.getDuccId(), l.getDuccId()); |
| |
| diffCommon(l, r, modifiedJobs, modifiedServices); |
| } |
| |
| handler.signalUpdates( |
| newJobs, |
| newServices, |
| deletedJobs, |
| deletedServices, |
| modifiedJobs, |
| modifiedServices |
| ); |
| } |
| |
| /** |
| * Publish the map, called by the ServiceHandler. |
| */ |
| public void publish(ServiceMap map) |
| { |
| String methodName = "publish"; |
| try { |
| SmStateDuccEvent ev = new SmStateDuccEvent(); |
| logger.info(methodName, null, "Publishing State, active job count =", map.size()); |
| if (logger.isDebug()) { |
| logger.info(methodName, null, map.toPrint()); |
| } |
| ev.setServiceMap(map); |
| eventDispatcher.dispatch(stateEndpoint, ev, ""); // tell the world what is scheduled (note empty string) |
| } catch (Throwable t) { |
| logger.error(methodName, null, t); |
| } |
| } |
| |
| /** |
| * Publish a heartbeat. |
| */ |
| public void publish() |
| { |
| String methodName = "publish"; |
| try { |
| SmHeartbeatDuccEvent ev = new SmHeartbeatDuccEvent(); |
| logger.info(methodName, null, "Publishing Heartbeat"); |
| eventDispatcher.dispatch(stateEndpoint, ev, ""); // tell the world SM is alive |
| } catch (Throwable t) { |
| logger.error(methodName, null, t); |
| } |
| } |
| |
| public void setTransportConfiguration(DuccEventDispatcher eventDispatcher, String stateEndpoint, String stateChangeEndpoint) |
| { |
| this.eventDispatcher = eventDispatcher; |
| this.stateEndpoint = stateEndpoint; |
| this.stateChangeEndpoint = stateChangeEndpoint; |
| } |
| |
| int epochCounter = 0; |
| IDuccWorkMap incomingMap = null; |
| public synchronized void runSm() |
| { |
| String methodName = "runSm"; |
| boolean first_update = true; |
| |
| while ( true ) { |
| |
| try { |
| wait(); |
| } catch (InterruptedException e) { |
| logger.info(methodName, null, "SM wait interrupted, executing out-of-band epoch."); |
| } |
| |
| try { |
| if ( first_update ) { |
| bootHandler(incomingMap); |
| first_update = false; |
| } |
| processIncoming(incomingMap); |
| } catch (Throwable e1) { |
| logger.fatal(methodName, null, e1); |
| } |
| |
| } |
| } |
| |
| private IDuccHead getDuccHead() { |
| if(dh == null) { |
| dh = DuccHead.getInstance(); |
| if(dh.is_ducc_head_backup()) { |
| stateChange(EventType.INIT_AS_BACKUP); |
| } |
| else { |
| stateChange(EventType.INIT_AS_MASTER); |
| } |
| } |
| return dh; |
| } |
| |
| public synchronized void orchestratorStateArrives(IDuccWorkMap map) |
| { |
| String methodName = "orchestratorStateArrives"; |
| try { |
| DuccHeadTransition transition = getDuccHead().transition(); |
| switch(transition) { |
| case master_to_master: |
| logger.debug(methodName, jobid, "ducc head == master"); |
| break; |
| case master_to_backup: |
| logger.warn(methodName, jobid, "ducc head -> backup"); |
| stateChange(EventType.SWITCH_TO_BACKUP); |
| handler.setAccessMode(AccessMode.RO); |
| quiesce(); |
| break; |
| case backup_to_master: |
| logger.warn(methodName, jobid, "ducc head -> master"); |
| stateChange(EventType.SWITCH_TO_MASTER); |
| handler.setAccessMode(AccessMode.RW); |
| resume(map); |
| break; |
| case backup_to_backup: |
| logger.debug(methodName, jobid, "ducc head == backup"); |
| break; |
| default: |
| logger.debug(methodName, jobid, "ducc head == unspecified"); |
| break; |
| } |
| |
| if ( ! initialized ) { |
| logger.info(methodName, null, "SM not initialized, ignoring Orchestrator state update."); |
| return; |
| } |
| else { |
| logger.debug(methodName, null, "SM initialized."); |
| } |
| |
| if ( ! map.isJobDriverMinimalAllocateRequirementMet() ) { |
| logger.info(methodName, null, "Orchestrator JD node not assigned, ignoring Orchestrator state update."); |
| // send a heartbeat to anyone who cares |
| publish(); |
| return; |
| } |
| else { |
| logger.debug(methodName, null, "JD requirements met."); |
| } |
| |
| orchestrator_alive = true; |
| epochCounter++; |
| incomingMap = map; |
| notify(); |
| } |
| catch(Exception e) { |
| logger.error(methodName, jobid, e); |
| } |
| } |
| |
| // @deprecated |
| static String serviceFileLocation() |
| { |
| return System.getProperty("DUCC_HOME") + "/state/services"; |
| } |
| |
| static String serviceHistoryLocation() |
| { |
| return System.getProperty("DUCC_HOME") + "/history/services-registry/"; |
| } |
| |
| private boolean check_signature(String user, byte[] auth_block) |
| throws Throwable |
| { |
| Crypto crypto = new Crypto(user); |
| return crypto.isValid(auth_block); |
| } |
| |
| private boolean validate_ducc_head(String action, AServiceRequest req) { |
| String methodName = "validate_ducc_head"; |
| boolean retVal = true; |
| if(dh.is_ducc_head_virtual_master()) { |
| // all is well |
| } |
| else { |
| String reason = "not master node"; |
| logger.warn(methodName, null, action + " rejected. " + reason); |
| req.setReply(makeResponse(false, reason, action, -1)); |
| retVal = false; |
| } |
| return retVal; |
| } |
| |
| private boolean validate_user(String action, AServiceRequest req) |
| { |
| String methodName = "validate_user"; |
| |
| // First check that request is from a compatible cli |
| if (req.getCliVersion() != CliVersion.getVersion()) { |
| String reason = "Incompatible CLI request using version " + req.getCliVersion() |
| + " while DUCC expects version " + CliVersion.getVersion(); |
| logger.warn(methodName, null, action + " rejected. " + reason); |
| req.setReply(makeResponse(false, reason, action, -1)); |
| return false; |
| } |
| |
| String user = req.getUser(); |
| byte[] auth_block= req.getAuth(); |
| boolean validated = false; |
| |
| if ( ! signature_required ) return true; |
| |
| try { |
| validated = check_signature(user, auth_block); |
| } catch ( Throwable t ) { |
| logger.error(methodName, null, "Crypto failure:", t.toString()); |
| } |
| |
| if ( ! validated ) { |
| logger.warn(methodName, null, "User", user, "cannot be validated.", action, "rejected."); |
| req.setReply(makeResponse(false, "User " + user + " cannot be validated. " + action + " rejected.", action, -1)); |
| return false; |
| } |
| return true; |
| } |
| |
| public boolean orchestratorAlive(String action, AServiceRequest req) |
| { |
| String methodName = "orchestratorAlive"; |
| if ( orchestrator_alive ) return true; |
| |
| logger.warn(methodName, null, action, "rejected: orchestrator is not yet active"); |
| req.setReply(makeResponse(false, action + " rejected, DUCC is still initializing.", action, -1)); |
| return false; |
| } |
| |
| public synchronized void register(ServiceRegisterEvent ev) |
| { |
| String methodName = "register"; |
| DuccProperties props = ev.getDescriptor(); |
| String endpoint = ev.getEndpoint(); |
| int instances = ev.getNinstances(); |
| Trinary autostart = ev.getAutostart(); |
| String user = ev.getUser(); |
| long regdate = System.currentTimeMillis(); |
| String regdate_readable = (new Date(regdate)).toString(); |
| |
| if ( ! validate_ducc_head("Register", ev) ) return; |
| if ( ! validate_user("Register", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Register", ev) ) return; |
| |
| DuccId id = null; |
| try { |
| id = newId(); |
| } catch ( Exception e ) { |
| logger.error(methodName, null, e); |
| ev.setReply(makeResponse(false, "Internal error; unable to generate id", endpoint, -1)); |
| return; |
| } |
| logger.debug(methodName, id, "Unique:", id.getUnique()); |
| |
| String logdir = props.getProperty(UiOption.LogDirectory.pname()); |
| if ( !logdir.endsWith("/") ) { |
| logdir = logdir + "/"; |
| } |
| logdir = logdir + "S-" + id.toString(); |
| props.put(UiOption.LogDirectory.pname(), logdir); |
| |
| DuccProperties meta = new DuccProperties(); |
| meta.setProperty(SvcMetaProps.user.pname(), user); |
| meta.setProperty(SvcMetaProps.instances.pname(), ""+instances); |
| meta.setProperty(SvcMetaProps.endpoint.pname(), endpoint); |
| meta.setProperty(SvcMetaProps.numeric_id.pname(), id.toString()); |
| meta.setProperty(SvcMetaProps.uuid.pname(), id.getUnique()); |
| meta.setProperty(SvcMetaProps.registration_date_millis.pname(), Long.toString(regdate)); |
| meta.setProperty(SvcMetaProps.registration_date.pname(), regdate_readable); |
| |
| if ( autostart == Trinary.True ) { |
| meta.setProperty(SvcMetaProps.autostart.pname(), "true"); |
| } else { |
| meta.setProperty(SvcMetaProps.autostart.pname(), "false"); |
| } |
| |
| ServiceReplyEvent reply = handler.register(id, props, meta, false); |
| ev.setReply(reply); |
| |
| // Draw attention in the log on registration failures |
| if ( reply.getReturnCode() ) { |
| logger.info(methodName, id, ev.toString()); |
| } else { |
| logger.warn(methodName, id, ev.toString()); |
| } |
| } |
| |
| public synchronized void unregister(ServiceUnregisterEvent ev) |
| { |
| if ( ! validate_ducc_head("Unregister", ev) ) return; |
| if ( ! validate_user("Unregister", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Unregister", ev) ) return; |
| |
| ServiceReplyEvent reply = handler.unregister(ev); |
| ev.setReply(reply); |
| } |
| |
| public synchronized void start(ServiceStartEvent ev) |
| { |
| String methodName = "start"; |
| |
| if ( ! validate_ducc_head("Start", ev) ) return; |
| if ( ! validate_user("Start", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Start", ev) ) return; |
| |
| logger.info(methodName, null, "Starting service", ev.toString()); |
| ServiceReplyEvent reply = handler.start(ev); |
| ev.setReply(reply); |
| //ev.setReply(ServiceCode.OK, "Service not implemented.", "no-endpoint", null); |
| } |
| |
| public synchronized void stop(ServiceStopEvent ev) |
| { |
| String methodName = "stop"; |
| |
| if ( ! validate_ducc_head("Stop", ev) ) return; |
| if ( ! validate_user("Stop", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Stop", ev) ) return; |
| |
| logger.info(methodName, null, "Stopping service", ev.toString()); |
| ServiceReplyEvent reply = handler.stop(ev); |
| ev.setReply(reply); |
| //ev.setReply(ServiceCode.OK, "Service not implemented.", "no-endpoint", null); |
| } |
| |
| public synchronized void enable(ServiceEnableEvent ev) |
| { |
| String methodName = "enable"; |
| |
| if ( ! validate_ducc_head("Enable", ev) ) return; |
| if ( ! validate_user("Enable", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Enable", ev) ) return; |
| |
| logger.info(methodName, null, "Enabling service", ev.toString()); |
| ServiceReplyEvent reply = handler.enable(ev); |
| ev.setReply(reply); |
| //ev.setReply(ServiceCode.OK, "Service not implemented.", "no-endpoint", null); |
| } |
| |
| public synchronized void disable(ServiceDisableEvent ev) |
| { |
| String methodName = "disable"; |
| |
| if ( ! validate_ducc_head("Disable", ev) ) return; |
| if ( ! validate_user("Disable", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Disable", ev) ) return; |
| |
| logger.info(methodName, null, "Disabling service", ev.toString()); |
| ServiceReplyEvent reply = handler.disable(ev); |
| ev.setReply(reply); |
| //ev.setReply(ServiceCode.OK, "Service not implemented.", "no-endpoint", null); |
| } |
| |
| public synchronized void observe(ServiceObserveEvent ev) |
| { |
| String methodName = "observe"; |
| |
| if ( ! validate_ducc_head("Observe", ev) ) return; |
| if ( ! validate_user("Observe", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Observe", ev) ) return; |
| |
| logger.info(methodName, null, "Observing references for service", ev.toString()); |
| ServiceReplyEvent reply = handler.observe(ev); |
| ev.setReply(reply); |
| //ev.setReply(ServiceCode.OK, "Service not implemented.", "no-endpoint", null); |
| } |
| |
| public synchronized void ignore(ServiceIgnoreEvent ev) |
| { |
| String methodName = "ignore"; |
| |
| if ( ! validate_ducc_head("Ignore", ev) ) return; |
| if ( ! validate_user("Ignore", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Ignore", ev) ) return; |
| |
| logger.info(methodName, null, "Ignoring references for service", ev.toString()); |
| ServiceReplyEvent reply = handler.ignore(ev); |
| ev.setReply(reply); |
| //ev.setReply(ServiceCode.OK, "Service not implemented.", "no-endpoint", null); |
| } |
| |
| public synchronized void query(ServiceQueryEvent ev) |
| { |
| String methodName = "query"; |
| |
| if ( ! validate_ducc_head("Query", ev) ) return; |
| if ( ! validate_user("Query", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Query", ev) ) return; |
| |
| logger.info(methodName, null, "Query", ev.toString()); |
| ServiceReplyEvent reply = handler.query(ev); |
| ev.setReply(reply); |
| //ev.setReply(ServiceCode.OK, "Service not implemented.", "no-endpoint", null); |
| } |
| |
| public synchronized void modify(ServiceModifyEvent ev) |
| { |
| String methodName = "modify"; |
| |
| if ( ! validate_ducc_head("Modify", ev) ) return; |
| if ( ! validate_user("Modify", ev) ) return; // necessary messages emitted in here |
| if ( ! orchestratorAlive("Modify", ev) ) return; |
| |
| logger.info(methodName, null, "Modify", ev.toString()); |
| ServiceReplyEvent reply = handler.modify(ev); |
| ev.setReply(reply); |
| //ev.setReply(ServiceCode.OK, "Service not implemented.", "no-endpoint", null); |
| } |
| |
| static void deleteProperties(String id, String meta_filename, Properties meta_props, String props_filename, Properties job_props) |
| { |
| // NOTE: During init we may now know the ID as a DuccId so it has to be passed in as a string |
| |
| String methodName = "deleteProperties"; |
| // Save a copy in history, and then delete the original |
| String history_dir = serviceHistoryLocation(); |
| if ( meta_filename != null ) { |
| File mfh = new File(history_dir + id + ".meta"); |
| try { |
| FileOutputStream fos = new FileOutputStream(mfh); |
| meta_props.store(fos, "Archived meta descriptor"); |
| fos.close(); |
| } catch (Exception e) { |
| logger.warn(methodName, null, id + ": Unable to save history to \"" + mfh.toString(), ": ", e.toString() + "\""); |
| } |
| |
| File mf = new File(meta_filename); |
| mf.delete(); |
| } |
| meta_filename = null; |
| |
| if ( props_filename != null ) { |
| File pfh = new File(history_dir + id + ".svc"); |
| try { |
| FileOutputStream fos = new FileOutputStream(pfh); |
| job_props.store(fos, "Archived svc properties."); |
| fos.close(); |
| } catch (Exception e) { |
| logger.warn(methodName, null, id + ":Unable to save history to \"" + pfh.toString(), ": ", e.toString() + "\""); |
| } |
| |
| File pf = new File(props_filename); |
| pf.delete(); |
| } |
| props_filename = null; |
| } |
| |
| @Override |
| public DuccId newId() throws Exception { |
| return idFactory.next(); |
| } |
| |
| } |