| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.airavata.orchestrator.server; |
| |
| import org.airavata.appcatalog.cpi.AppCatalog; |
| import org.airavata.appcatalog.cpi.AppCatalogException; |
| import org.airavata.appcatalog.cpi.ComputeResource; |
| import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; |
| import org.apache.aiaravata.application.catalog.data.resources.AbstractResource; |
| import org.apache.airavata.common.exception.AiravataException; |
| import org.apache.airavata.common.exception.ApplicationSettingsException; |
| import org.apache.airavata.common.logger.AiravataLogger; |
| import org.apache.airavata.common.logger.AiravataLoggerFactory; |
| import org.apache.airavata.common.utils.AiravataUtils; |
| import org.apache.airavata.common.utils.AiravataZKUtils; |
| import org.apache.airavata.common.utils.Constants; |
| import org.apache.airavata.common.utils.ServerSettings; |
| import org.apache.airavata.credential.store.store.CredentialReader; |
| import org.apache.airavata.gfac.core.scheduler.HostScheduler; |
| import org.apache.airavata.gfac.core.utils.GFacUtils; |
| import org.apache.airavata.messaging.core.MessageContext; |
| import org.apache.airavata.messaging.core.MessageHandler; |
| import org.apache.airavata.messaging.core.MessagingConstants; |
| import org.apache.airavata.messaging.core.Publisher; |
| import org.apache.airavata.messaging.core.PublisherFactory; |
| import org.apache.airavata.messaging.core.impl.RabbitMQProcessConsumer; |
| import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; |
| import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; |
| import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; |
| import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; |
| import org.apache.airavata.model.error.LaunchValidationException; |
| import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; |
| import org.apache.airavata.model.messaging.event.MessageType; |
| import org.apache.airavata.model.messaging.event.ProcessSubmitEvent; |
| import org.apache.airavata.model.util.ExecutionType; |
| import org.apache.airavata.model.workspace.experiment.Experiment; |
| import org.apache.airavata.model.workspace.experiment.ExperimentState; |
| import org.apache.airavata.model.workspace.experiment.ExperimentStatus; |
| import org.apache.airavata.model.workspace.experiment.TaskDetails; |
| import org.apache.airavata.model.workspace.experiment.TaskState; |
| import org.apache.airavata.model.workspace.experiment.TaskStatus; |
| import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; |
| import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; |
| import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus; |
| import org.apache.airavata.orchestrator.core.exception.OrchestratorException; |
| import org.apache.airavata.orchestrator.cpi.OrchestratorService; |
| import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl; |
| import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants; |
| import org.apache.airavata.orchestrator.util.DataModelUtils; |
| import org.apache.airavata.orchestrator.util.OrchestratorServerThreadPoolExecutor; |
| import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; |
| import org.apache.airavata.registry.cpi.Registry; |
| import org.apache.airavata.registry.cpi.RegistryException; |
| import org.apache.airavata.registry.cpi.RegistryModelType; |
| import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants; |
| import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants; |
| import org.apache.airavata.workflow.core.WorkflowEnactmentService; |
| import org.apache.thrift.TBase; |
| import org.apache.thrift.TException; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.Stat; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Calendar; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| |
| public class OrchestratorServerHandler implements OrchestratorService.Iface, |
| Watcher { |
| private static AiravataLogger log = AiravataLoggerFactory |
| .getLogger(OrchestratorServerHandler.class); |
| |
| private SimpleOrchestratorImpl orchestrator = null; |
| |
| private Registry registry; |
| |
| private ZooKeeper zk; |
| |
| private static Integer mutex = new Integer(-1); |
| |
| private String airavataUserName; |
| private String gatewayName; |
| private Publisher publisher; |
| |
| private RabbitMQProcessConsumer rabbitMQProcessConsumer; |
| private RabbitMQProcessPublisher rabbitMQProcessPublisher; |
| |
| /** |
| * Query orchestrator server to fetch the CPI version |
| */ |
| public String getOrchestratorCPIVersion() throws TException { |
| |
| return orchestrator_cpi_serviceConstants.ORCHESTRATOR_CPI_VERSION; |
| } |
| |
| public OrchestratorServerHandler() throws OrchestratorException{ |
| // registering with zk |
| try { |
| if (ServerSettings.isRabbitMqPublishEnabled()) { |
| publisher = PublisherFactory.createActivityPublisher(); |
| } |
| String zkhostPort = AiravataZKUtils.getZKhostPort(); |
| String airavataServerHostPort = ServerSettings |
| .getSetting(Constants.ORCHESTRATOR_SERVER_HOST) |
| + ":" |
| + ServerSettings |
| .getSetting(Constants.ORCHESTRATOR_SERVER_PORT); |
| |
| // setGatewayName(ServerSettings.getDefaultUserGateway()); |
| setAiravataUserName(ServerSettings.getDefaultUser()); |
| try { |
| zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is |
| // required, this |
| // will only use to |
| // store some data |
| String OrchServer = ServerSettings |
| .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE); |
| synchronized (mutex) { |
| mutex.wait(); // waiting for the syncConnected event |
| } |
| registerOrchestratorService(airavataServerHostPort, OrchServer); |
| // creating a watch in orchestrator to monitor the gfac |
| // instances |
| zk.getChildren(ServerSettings.getSetting( |
| Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"), |
| this); |
| log.info("Finished starting ZK: " + zk); |
| } catch (IOException e) { |
| log.error(e.getMessage(), e); |
| throw new OrchestratorException("Error while initializing orchestrator service", e); |
| } catch (InterruptedException e) { |
| log.error(e.getMessage(), e); |
| throw new OrchestratorException("Error while initializing orchestrator service", e); |
| } catch (KeeperException e) { |
| log.error(e.getMessage(), e); |
| throw new OrchestratorException("Error while initializing orchestrator service", e); |
| } |
| } catch (ApplicationSettingsException e) { |
| log.error(e.getMessage(), e); |
| throw new OrchestratorException("Error while initializing orchestrator service", e); |
| }catch (AiravataException e) { |
| log.error(e.getMessage(), e); |
| throw new OrchestratorException("Error while initializing orchestrator service", e); |
| } |
| // orchestrator init |
| try { |
| // first constructing the monitorManager and orchestrator, then fill |
| // the required properties |
| orchestrator = new SimpleOrchestratorImpl(); |
| registry = RegistryFactory.getDefaultRegistry(); |
| orchestrator.initialize(); |
| orchestrator.getOrchestratorContext().setZk(this.zk); |
| orchestrator.getOrchestratorContext().setPublisher(this.publisher); |
| startProcessConsumer(); |
| } catch (OrchestratorException e) { |
| log.error(e.getMessage(), e); |
| throw new OrchestratorException("Error while initializing orchestrator service", e); |
| } catch (RegistryException e) { |
| log.error(e.getMessage(), e); |
| throw new OrchestratorException("Error while initializing orchestrator service", e); |
| } |
| } |
| |
| private void startProcessConsumer() throws OrchestratorException { |
| try { |
| rabbitMQProcessConsumer = new RabbitMQProcessConsumer(); |
| ProcessConsumer processConsumer = new ProcessConsumer(); |
| Thread thread = new Thread(processConsumer); |
| thread.start(); |
| |
| } catch (AiravataException e) { |
| throw new OrchestratorException("Error while starting process consumer", e); |
| } |
| |
| } |
| |
| private void registerOrchestratorService(String airavataServerHostPort, String orchServer) throws KeeperException, InterruptedException { |
| Stat zkStat = zk.exists(orchServer, false); |
| if (zkStat == null) { |
| zk.create(orchServer, new byte[0], |
| ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); |
| } |
| String instantNode = orchServer |
| + File.separator |
| + String.valueOf(new Random() |
| .nextInt(Integer.MAX_VALUE)); |
| zkStat = zk.exists(instantNode, false); |
| if (zkStat == null) { |
| zk.create(instantNode, airavataServerHostPort.getBytes(), |
| ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); |
| } |
| } |
| |
| /** |
| * * After creating the experiment Data user have the * experimentID as the |
| * handler to the experiment, during the launchExperiment * We just have to |
| * give the experimentID * * @param experimentID * @return sucess/failure * |
| * * |
| * |
| * @param experimentId |
| */ |
| public boolean launchExperiment(String experimentId, String token) throws TException { |
| Experiment experiment = null; // this will inside the bottom catch statement |
| try { |
| experiment = (Experiment) registry.get( |
| RegistryModelType.EXPERIMENT, experimentId); |
| if (experiment == null) { |
| log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId); |
| return false; |
| } |
| CredentialReader credentialReader = GFacUtils.getCredentialReader(); |
| String gatewayId = null; |
| if (credentialReader != null) { |
| try { |
| gatewayId = credentialReader.getGatewayID(token); |
| } catch (Exception e) { |
| log.error(e.getLocalizedMessage()); |
| } |
| } |
| if (gatewayId == null) { |
| throw new AiravataException("Couldn't identify the gateway Id using the credential token"); |
| } |
| ExecutionType executionType = DataModelUtils.getExecutionType(gatewayId, experiment); |
| if (executionType == ExecutionType.SINGLE_APP) { |
| //its an single application execution experiment |
| log.debugId(experimentId, "Launching single application experiment {}.", experimentId); |
| OrchestratorServerThreadPoolExecutor.getFixedThreadPool().execute(new SingleAppExperimentRunner(experimentId, token)); |
| } else if (executionType == ExecutionType.WORKFLOW) { |
| //its a workflow execution experiment |
| log.debugId(experimentId, "Launching workflow experiment {}.", experimentId); |
| launchWorkflowExperiment(experimentId, token); |
| } else { |
| log.errorId(experimentId, "Couldn't identify experiment type, experiment {} is neither single application nor workflow.", experimentId); |
| throw new TException("Experiment '" + experimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getApplicationId()); |
| } |
| } catch (Exception e) { |
| throw new TException("Experiment '" + experimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getApplicationId()); |
| } |
| return true; |
| } |
| |
| /** |
| * This method will validate the experiment before launching, if is failed |
| * we do not run the launch in airavata thrift service (only if validation |
| * is enabled |
| * |
| * @param experimentId |
| * @return |
| * @throws TException |
| */ |
| public boolean validateExperiment(String experimentId) throws TException, |
| LaunchValidationException { |
| // TODO: Write the Orchestrator implementaion |
| try { |
| List<TaskDetails> tasks = orchestrator.createTasks(experimentId); |
| if (tasks.size() > 1) { |
| log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs"); |
| } |
| List<String> ids = registry.getIds( |
| RegistryModelType.WORKFLOW_NODE_DETAIL, |
| WorkflowNodeConstants.EXPERIMENT_ID, experimentId); |
| for (String workflowNodeId : ids) { |
| WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry |
| .get(RegistryModelType.WORKFLOW_NODE_DETAIL, |
| workflowNodeId); |
| List<Object> taskDetailList = registry.get( |
| RegistryModelType.TASK_DETAIL, |
| TaskDetailConstants.NODE_ID, workflowNodeId); |
| for (Object o : taskDetailList) { |
| TaskDetails taskID = (TaskDetails) o; |
| // iterate through all the generated tasks and performs the |
| // job submisssion+monitoring |
| Experiment experiment = (Experiment) registry.get( |
| RegistryModelType.EXPERIMENT, experimentId); |
| if (experiment == null) { |
| log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}.", |
| experimentId); |
| return false; |
| } |
| return orchestrator.validateExperiment(experiment, |
| workflowNodeDetail, taskID).isSetValidationState(); |
| } |
| } |
| |
| } catch (OrchestratorException e) { |
| log.errorId(experimentId, "Error while validating experiment", e); |
| throw new TException(e); |
| } catch (RegistryException e) { |
| log.errorId(experimentId, "Error while validating experiment", e); |
| throw new TException(e); |
| } |
| return false; |
| } |
| |
| /** |
| * This can be used to cancel a running experiment and store the status to |
| * terminated in registry |
| * |
| * @param experimentId |
| * @return |
| * @throws TException |
| */ |
| public boolean terminateExperiment(String experimentId) throws TException { |
| log.infoId(experimentId, "Experiment: {} is cancelling !!!!!", experimentId); |
| return validateStatesAndCancel(experimentId); |
| } |
| |
| /** |
| * This method gracefully handler gfac node failures |
| */ |
| synchronized public void process(WatchedEvent watchedEvent) { |
| synchronized (mutex) { |
| try { |
| Event.KeeperState state = watchedEvent.getState(); |
| switch (state) { |
| case SyncConnected: |
| mutex.notify(); |
| break; |
| case Expired:case Disconnected: |
| try { |
| zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this); |
| synchronized (mutex) { |
| mutex.wait(); // waiting for the syncConnected event |
| } |
| String airavataServerHostPort = ServerSettings |
| .getSetting(Constants.ORCHESTRATOR_SERVER_HOST) |
| + ":" |
| + ServerSettings |
| .getSetting(Constants.ORCHESTRATOR_SERVER_PORT); |
| String OrchServer = ServerSettings |
| .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE); |
| registerOrchestratorService(airavataServerHostPort, OrchServer); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } catch (ApplicationSettingsException e) { |
| e.printStackTrace(); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } catch (KeeperException e) { |
| e.printStackTrace(); |
| } |
| break; |
| } |
| if (watchedEvent.getPath() != null |
| && watchedEvent.getPath().startsWith( |
| ServerSettings.getSetting( |
| Constants.ZOOKEEPER_GFAC_SERVER_NODE, |
| "/gfac-server"))) { |
| List<String> children = zk.getChildren(ServerSettings |
| .getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, |
| "/gfac-server"), true); |
| for (String gfacNodes : children) { |
| zk.exists( |
| ServerSettings.getSetting( |
| Constants.ZOOKEEPER_GFAC_SERVER_NODE, |
| "/gfac-server") |
| + File.separator + gfacNodes, this); |
| } |
| switch (watchedEvent.getType()) { |
| case NodeCreated: |
| mutex.notify(); |
| break; |
| case NodeDeleted: |
| // here we have to handle gfac node shutdown case |
| if (children.size() == 0) { |
| log.error("There are not gfac instances to route failed jobs"); |
| return; |
| } |
| // we recover one gfac node at a time |
| final WatchedEvent event = watchedEvent; |
| final OrchestratorServerHandler handler = this; |
| /*(new Thread() { // disabling ft implementation with zk |
| public void run() { |
| int retry = 0; |
| while (retry < 3) { |
| try { |
| (new OrchestratorRecoveryHandler( |
| handler, event.getPath())) |
| .recover(); |
| break; |
| } catch (Exception e) { |
| e.printStackTrace(); |
| log.error("error recovering the jobs for gfac-node: " |
| + event.getPath()); |
| log.error("Retrying again to recover jobs and retry attempt: " |
| + ++retry); |
| } |
| } |
| |
| } |
| }).start();*/ |
| break; |
| } |
| |
| |
| } |
| } catch (KeeperException e) { |
| e.printStackTrace(); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| |
| private String getAiravataUserName() { |
| return airavataUserName; |
| } |
| |
| private String getGatewayName() { |
| return gatewayName; |
| } |
| |
| public void setAiravataUserName(String airavataUserName) { |
| this.airavataUserName = airavataUserName; |
| } |
| |
| public void setGatewayName(String gatewayName) { |
| this.gatewayName = gatewayName; |
| } |
| |
| @Override |
| public boolean launchTask(String taskId, String airavataCredStoreToken) throws TException { |
| try { |
| TaskDetails taskData = (TaskDetails) registry.get( |
| RegistryModelType.TASK_DETAIL, taskId); |
| String applicationId = taskData.getApplicationId(); |
| if (applicationId == null) { |
| log.errorId(taskId, "Application id shouldn't be null."); |
| throw new OrchestratorException("Error executing the job, application id shouldn't be null."); |
| } |
| ApplicationDeploymentDescription applicationDeploymentDescription = getAppDeployment(taskData, applicationId); |
| taskData.setApplicationDeploymentId(applicationDeploymentDescription.getAppDeploymentId()); |
| registry.update(RegistryModelType.TASK_DETAIL, taskData,taskData.getTaskID()); |
| List<Object> workflowNodeDetailList = registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, |
| org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants.TASK_LIST, taskData); |
| if (workflowNodeDetailList != null |
| && workflowNodeDetailList.size() > 0) { |
| List<Object> experimentList = registry.get(RegistryModelType.EXPERIMENT, |
| org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.ExperimentConstants.WORKFLOW_NODE_LIST, |
| (WorkflowNodeDetails) workflowNodeDetailList.get(0)); |
| if (experimentList != null && experimentList.size() > 0) { |
| return orchestrator |
| .launchExperiment( |
| (Experiment) experimentList.get(0), |
| (WorkflowNodeDetails) workflowNodeDetailList |
| .get(0), taskData,airavataCredStoreToken); |
| } |
| } |
| } catch (Exception e) { |
| log.errorId(taskId, "Error while launching task ", e); |
| throw new TException(e); |
| } |
| log.infoId(taskId, "No experiment found associated in task {}", taskId); |
| return false; |
| } |
| |
| private ApplicationDeploymentDescription getAppDeployment( |
| TaskDetails taskData, String applicationId) |
| throws AppCatalogException, OrchestratorException, |
| ClassNotFoundException, ApplicationSettingsException, |
| InstantiationException, IllegalAccessException { |
| AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); |
| String selectedModuleId = getModuleId(appCatalog, applicationId); |
| ApplicationDeploymentDescription applicationDeploymentDescription = getAppDeployment( |
| appCatalog, taskData, selectedModuleId); |
| return applicationDeploymentDescription; |
| } |
| |
| private ApplicationDeploymentDescription getAppDeployment( |
| AppCatalog appCatalog, TaskDetails taskData, String selectedModuleId) |
| throws AppCatalogException, ClassNotFoundException, |
| ApplicationSettingsException, InstantiationException, |
| IllegalAccessException { |
| Map<String, String> moduleIdFilter = new HashMap<String, String>(); |
| moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.APP_MODULE_ID, selectedModuleId); |
| if (taskData.getTaskScheduling()!=null && taskData.getTaskScheduling().getResourceHostId() != null) { |
| moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.COMPUTE_HOST_ID, taskData.getTaskScheduling().getResourceHostId()); |
| } |
| List<ApplicationDeploymentDescription> applicationDeployements = appCatalog.getApplicationDeployment().getApplicationDeployements(moduleIdFilter); |
| Map<ComputeResourceDescription, ApplicationDeploymentDescription> deploymentMap = new HashMap<ComputeResourceDescription, ApplicationDeploymentDescription>(); |
| ComputeResource computeResource = appCatalog.getComputeResource(); |
| for (ApplicationDeploymentDescription deploymentDescription : applicationDeployements) { |
| deploymentMap.put(computeResource.getComputeResource(deploymentDescription.getComputeHostId()),deploymentDescription); |
| } |
| List<ComputeResourceDescription> computeHostList = Arrays.asList(deploymentMap.keySet().toArray(new ComputeResourceDescription[]{})); |
| Class<? extends HostScheduler> aClass = Class.forName( |
| ServerSettings.getHostScheduler()).asSubclass( |
| HostScheduler.class); |
| HostScheduler hostScheduler = aClass.newInstance(); |
| ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList); |
| ApplicationDeploymentDescription applicationDeploymentDescription = deploymentMap.get(ComputeResourceDescription); |
| return applicationDeploymentDescription; |
| } |
| |
| private String getModuleId(AppCatalog appCatalog, String applicationId) |
| throws AppCatalogException, OrchestratorException { |
| ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface().getApplicationInterface(applicationId); |
| List<String> applicationModules = applicationInterface.getApplicationModules(); |
| if (applicationModules.size()==0){ |
| throw new OrchestratorException( |
| "No modules defined for application " |
| + applicationId); |
| } |
| // AiravataAPI airavataAPI = getAiravataAPI(); |
| String selectedModuleId=applicationModules.get(0); |
| return selectedModuleId; |
| } |
| |
| private boolean validateStatesAndCancel(String experimentId)throws TException{ |
| try { |
| Experiment experiment = (Experiment) registry.get( |
| RegistryModelType.EXPERIMENT, experimentId); |
| if (experiment == null) { |
| log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}.", experimentId); |
| throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId); |
| } |
| ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState(); |
| if (experimentState.getValue()> 5 && experimentState.getValue()<10) { |
| log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.", |
| experiment.getExperimentStatus().getExperimentState().toString(), experimentId); |
| throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: " |
| + experiment.getExperimentStatus().getExperimentState().toString()); |
| }else if(experimentState.getValue()<3){ |
| // when experiment status is < 3 no jobDetails object is created, |
| // so we don't have to worry, we simply have to change the status and stop the execution |
| ExperimentStatus status = new ExperimentStatus(); |
| status.setExperimentState(ExperimentState.CANCELED); |
| status.setTimeOfStateChange(Calendar.getInstance() |
| .getTimeInMillis()); |
| experiment.setExperimentStatus(status); |
| registry.update(RegistryModelType.EXPERIMENT, experiment, |
| experimentId); |
| List<String> ids = registry.getIds( |
| RegistryModelType.WORKFLOW_NODE_DETAIL, |
| WorkflowNodeConstants.EXPERIMENT_ID, experimentId); |
| for (String workflowNodeId : ids) { |
| WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry |
| .get(RegistryModelType.WORKFLOW_NODE_DETAIL, |
| workflowNodeId); |
| WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); |
| workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); |
| workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() |
| .getTimeInMillis()); |
| workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); |
| registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, |
| workflowNodeId); |
| List<Object> taskDetailList = registry.get( |
| RegistryModelType.TASK_DETAIL, |
| TaskDetailConstants.NODE_ID, workflowNodeId); |
| for (Object o : taskDetailList) { |
| TaskDetails taskDetails = (TaskDetails) o; |
| TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); |
| taskStatus.setExecutionState(TaskState.CANCELED); |
| taskStatus.setTimeOfStateChange(Calendar.getInstance() |
| .getTimeInMillis()); |
| taskDetails.setTaskStatus(taskStatus); |
| registry.update(RegistryModelType.TASK_DETAIL, o, |
| taskDetails); |
| GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(), zk); |
| } |
| } |
| }else { |
| |
| ExperimentStatus status = new ExperimentStatus(); |
| status.setExperimentState(ExperimentState.CANCELING); |
| status.setTimeOfStateChange(Calendar.getInstance() |
| .getTimeInMillis()); |
| experiment.setExperimentStatus(status); |
| registry.update(RegistryModelType.EXPERIMENT, experiment, |
| experimentId); |
| |
| List<String> ids = registry.getIds( |
| RegistryModelType.WORKFLOW_NODE_DETAIL, |
| WorkflowNodeConstants.EXPERIMENT_ID, experimentId); |
| for (String workflowNodeId : ids) { |
| WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry |
| .get(RegistryModelType.WORKFLOW_NODE_DETAIL, |
| workflowNodeId); |
| int value = workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue(); |
| if ( value> 1 && value < 7) { // we skip the unknown state |
| log.error(workflowNodeDetail.getNodeName() + " Workflow Node status cannot mark as cancelled, because " + |
| "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString()); |
| continue; // this continue is very useful not to process deeper loops if the upper layers have non-cancel states |
| } else { |
| WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); |
| workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING); |
| workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() |
| .getTimeInMillis()); |
| workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); |
| registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, |
| workflowNodeId); |
| } |
| List<Object> taskDetailList = registry.get( |
| RegistryModelType.TASK_DETAIL, |
| TaskDetailConstants.NODE_ID, workflowNodeId); |
| for (Object o : taskDetailList) { |
| TaskDetails taskDetails = (TaskDetails) o; |
| TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); |
| if (taskStatus.getExecutionState().getValue() > 7 && taskStatus.getExecutionState().getValue()<12) { |
| log.error(((TaskDetails) o).getTaskID() + " Task status cannot mark as cancelled, because " + |
| "current task state is " + ((TaskDetails) o).getTaskStatus().getExecutionState().toString()); |
| continue;// this continue is very useful not to process deeper loops if the upper layers have non-cancel states |
| } else { |
| taskStatus.setExecutionState(TaskState.CANCELING); |
| taskStatus.setTimeOfStateChange(Calendar.getInstance() |
| .getTimeInMillis()); |
| taskDetails.setTaskStatus(taskStatus); |
| registry.update(RegistryModelType.TASK_DETAIL, o, |
| taskDetails.getTaskID()); |
| GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(), zk); |
| } |
| // iterate through all the generated tasks and performs the |
| // job submisssion+monitoring |
| // launching the experiment |
| orchestrator.cancelExperiment(experiment, |
| workflowNodeDetail, taskDetails, null); |
| |
| // after performing gfac level cancel operation |
| // mark task cancelled |
| taskStatus.setExecutionState(TaskState.CANCELED); |
| taskStatus.setTimeOfStateChange(Calendar.getInstance() |
| .getTimeInMillis()); |
| taskDetails.setTaskStatus(taskStatus); |
| registry.update(RegistryModelType.TASK_DETAIL, o, |
| taskDetails.getTaskID()); |
| } |
| // mark workflownode cancelled |
| WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); |
| workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); |
| workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() |
| .getTimeInMillis()); |
| workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); |
| registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, |
| workflowNodeId); |
| } |
| // mark experiment cancelled |
| status = new ExperimentStatus(); |
| status.setExperimentState(ExperimentState.CANCELED); |
| status.setTimeOfStateChange(Calendar.getInstance() |
| .getTimeInMillis()); |
| experiment.setExperimentStatus(status); |
| registry.update(RegistryModelType.EXPERIMENT, experiment, |
| experimentId); |
| } |
| log.info("Experiment: " + experimentId + " is cancelled !!!!!"); |
| |
| } catch (Exception e) { |
| throw new TException(e); |
| } |
| return true; |
| } |
| |
| private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException { |
| try { |
| WorkflowEnactmentService.getInstance(). |
| submitWorkflow(experimentId, airavataCredStoreToken, getGatewayName(), getRabbitMQProcessPublisher()); |
| } catch (Exception e) { |
| log.error("Error while launching workflow", e); |
| } |
| } |
| |
| public synchronized RabbitMQProcessPublisher getRabbitMQProcessPublisher() throws Exception { |
| if (rabbitMQProcessPublisher == null) { |
| rabbitMQProcessPublisher = new RabbitMQProcessPublisher(); |
| } |
| return rabbitMQProcessPublisher; |
| } |
| |
| |
| private class SingleAppExperimentRunner implements Runnable { |
| |
| String experimentId; |
| String airavataCredStoreToken; |
| public SingleAppExperimentRunner(String experimentId,String airavataCredStoreToken){ |
| this.experimentId = experimentId; |
| this.airavataCredStoreToken = airavataCredStoreToken; |
| } |
| @Override |
| public void run() { |
| try { |
| launchSingleAppExperiment(); |
| } catch (TException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| private boolean launchSingleAppExperiment() throws TException { |
| Experiment experiment = null; |
| try { |
| List<String> ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, experimentId); |
| for (String workflowNodeId : ids) { |
| // WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId); |
| List<Object> taskDetailList = registry.get(RegistryModelType.TASK_DETAIL, TaskDetailConstants.NODE_ID, workflowNodeId); |
| for (Object o : taskDetailList) { |
| TaskDetails taskData = (TaskDetails) o; |
| //iterate through all the generated tasks and performs the job submisssion+monitoring |
| experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentId); |
| if (experiment == null) { |
| log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}", experimentId); |
| return false; |
| } |
| ExperimentStatus status = new ExperimentStatus(); |
| status.setExperimentState(ExperimentState.LAUNCHED); |
| status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); |
| experiment.setExperimentStatus(status); |
| registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); |
| if (ServerSettings.isRabbitMqPublishEnabled()) { |
| String gatewayId = null; |
| CredentialReader credentialReader = GFacUtils.getCredentialReader(); |
| if (credentialReader != null) { |
| try { |
| gatewayId = credentialReader.getGatewayID(airavataCredStoreToken); |
| } catch (Exception e) { |
| log.error(e.getLocalizedMessage()); |
| } |
| } |
| if(gatewayId == null || gatewayId.isEmpty()){ |
| gatewayId = ServerSettings.getDefaultUserGateway(); |
| } |
| ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED, |
| experimentId, |
| gatewayId); |
| String messageId = AiravataUtils.getId("EXPERIMENT"); |
| MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId); |
| messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); |
| publisher.publish(messageContext); |
| } |
| registry.update(RegistryModelType.TASK_DETAIL, taskData, taskData.getTaskID()); |
| //launching the experiment |
| launchTask(taskData.getTaskID(), airavataCredStoreToken); |
| } |
| } |
| |
| } catch (Exception e) { |
| // Here we really do not have to do much because only potential failure can happen |
| // is in gfac, if there are errors in gfac, it will handle the experiment/task/job statuses |
| // We might get failures in registry access before submitting the jobs to gfac, in that case we |
| // leave the status of these as created. |
| ExperimentStatus status = new ExperimentStatus(); |
| status.setExperimentState(ExperimentState.FAILED); |
| status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); |
| experiment.setExperimentStatus(status); |
| try { |
| registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); |
| } catch (RegistryException e1) { |
| log.errorId(experimentId, "Error while updating experiment status to " + status.toString(), e); |
| throw new TException(e); |
| } |
| log.errorId(experimentId, "Error while updating task status, hence updated experiment status to " + status.toString(), e); |
| throw new TException(e); |
| } |
| return true; |
| } |
| } |
| |
| private class ProcessConsumer implements Runnable, MessageHandler{ |
| |
| |
| @Override |
| public void run() { |
| try { |
| rabbitMQProcessConsumer.listen(this); |
| } catch (AiravataException e) { |
| log.error("Error while listen to the RabbitMQProcessConsumer"); |
| } |
| } |
| |
| @Override |
| public Map<String, Object> getProperties() { |
| Map<String, Object> props = new HashMap<String, Object>(); |
| props.put(MessagingConstants.RABBIT_QUEUE, RabbitMQProcessPublisher.PROCESS); |
| props.put(MessagingConstants.RABBIT_ROUTING_KEY, RabbitMQProcessPublisher.PROCESS); |
| return props; |
| } |
| |
| @Override |
| public void onMessage(MessageContext msgCtx) { |
| TBase event = msgCtx.getEvent(); |
| if (event instanceof ProcessSubmitEvent) { |
| ProcessSubmitEvent processSubmitEvent = (ProcessSubmitEvent) event; |
| try { |
| launchTask(processSubmitEvent.getTaskId(), processSubmitEvent.getCredentialToken()); |
| } catch (TException e) { |
| log.error("Error while launching task : " + processSubmitEvent.getTaskId()); |
| } |
| } |
| } |
| } |
| |
| } |