blob: da937ebc6b4507480fc93765b744680fce6ef248 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
package org.apache.airavata.api.server.listener;
import com.google.common.eventbus.Subscribe;
import org.apache.airavata.api.server.util.DataModelUtils;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
import org.apache.airavata.model.util.ExecutionType;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.experiment.ExperimentModelState;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Calendar;
public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
private ExperimentCatalog airavataExperimentCatalog;
private LocalEventPublisher localEventPublisher;
private Publisher publisher;
private CuratorFramework curatorClient;
private RabbitMQTaskLaunchConsumer consumer;
public ExperimentCatalog getAiravataExperimentCatalog() {
return airavataExperimentCatalog;
}
public void setAiravataExperimentCatalog(ExperimentCatalog airavataExperimentCatalog) {
this.airavataExperimentCatalog = airavataExperimentCatalog;
}
@Subscribe
public void setupExperimentStatus(WorkflowNodeStatusChangeEvent nodeStatus) throws Exception{
try {
boolean updateExperimentStatus=true;
boolean clean= false;
ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataExperimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
String experimentPath = experimentNode + File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)
+ File.separator + nodeStatus.getWorkflowNodeIdentity().getExperimentId();
ExperimentState state;
switch (nodeStatus.getState()) {
case CANCELED:
state = ExperimentState.CANCELED; updateExperimentStatus = true;
clean = true;
break;
case COMPLETED:
if(executionType.equals(ExecutionType.SINGLE_APP)){
state = ExperimentState.COMPLETED; updateExperimentStatus = true;
}else{
state = ExperimentState.EXECUTING; updateExperimentStatus = true;
}
clean = true;
break;
case INVOKED:
state = ExperimentState.EXECUTING; updateExperimentStatus = false;
break;
case FAILED:
state = ExperimentState.FAILED; updateExperimentStatus = true;
clean = true;
break;
case EXECUTING:
state = ExperimentState.EXECUTING; updateExperimentStatus = true;
break;
case CANCELING:
state = ExperimentState.CANCELING; updateExperimentStatus = true;
break;
default:
return;
}
if (!updateExperimentStatus){
updateExperimentStatus=(executionType==ExecutionType.SINGLE_APP);
}
updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state);
logger.debug("Publishing experiment status for " + nodeStatus.getWorkflowNodeIdentity().getExperimentId() + ":" + state.toString());
ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(state,
nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
nodeStatus.getWorkflowNodeIdentity().getGatewayId());
localEventPublisher.publish(event);
String messageId = AiravataUtils.getId("EXPERIMENT");
MessageContext msgCntxt = new MessageContext(event, MessageType.EXPERIMENT, messageId, nodeStatus.getWorkflowNodeIdentity().getGatewayId());
msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
publisher.publish(msgCntxt);
if (clean) {
cleanup(nodeStatus, experimentNode, experimentPath);
}
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
throw new Exception("Error persisting experiment status..", e);
}
}
private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode, String experimentPath) throws Exception {
int count = 0;
long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), curatorClient,
experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
if(deliveryTag>0) {
if (ServerSettings.isGFacPassiveMode()) {
while (!consumer.isOpen() && count < 3) {
try {
consumer.reconnect();
} catch (AiravataException e) {
count++;
}
}
try {
if (consumer.isOpen()) {
consumer.sendAck(deliveryTag);
}
} catch (Exception e) {
logger.error("Error sending the Ack ! If the worker pick this again airavata should gracefully handle !");
}
}
}
if (curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX) != null) {
ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, true);
}
if (curatorClient.checkExists().forPath(experimentPath) != null) {
ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath, true);
}
// ack cancel operation if exist
long cancelDT = AiravataZKUtils.getCancelDeliveryTagIfExist(nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
curatorClient, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
count = 0;
if (cancelDT > 0) {
while (!consumer.isOpen() && count < 3) {
try {
consumer.reconnect();
} catch (AiravataException e) {
count++;
}
}
try {
if (consumer.isOpen()) {
consumer.sendAck(cancelDT);
}
} catch (Exception e) {
logger.error("Error sending the Ack for cancel operation, cancel experiment path : " + experimentPath);
}
}
if (cancelDT > 0) {
ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
}
}
public ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
Experiment details = (Experiment) airavataExperimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
if(details == null) {
details = new Experiment();
details.setExperimentID(experimentId);
}
org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
status.setExperimentState(state);
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
status.setExperimentState(state);
details.setExperimentStatus(status);
logger.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString());
airavataExperimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, experimentId);
return details.getExperimentStatus().getExperimentState();
}
public void setup(Object... configurations) {
for (Object configuration : configurations) {
if (configuration instanceof ExperimentCatalog){
this.airavataExperimentCatalog =(ExperimentCatalog)configuration;
} else if (configuration instanceof LocalEventPublisher){
this.localEventPublisher =(LocalEventPublisher) configuration;
} else if (configuration instanceof Publisher){
this.publisher=(Publisher) configuration;
}else if (configuration instanceof RabbitMQTaskLaunchConsumer) {
this.consumer = (RabbitMQTaskLaunchConsumer) configuration;
}else if (configuration instanceof CuratorFramework) {
this.curatorClient = (CuratorFramework) configuration;
}
}
}
}
*/