blob: ca5e7d055561bfcbe03b009a56abbb3dfde7664a [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.gfac.server;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.AiravataStartupException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logging.MDCConstants;
import org.apache.airavata.common.logging.MDCUtil;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.credential.store.store.CredentialStoreException;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.gfac.cpi.gfac_cpiConstants;
import org.apache.airavata.gfac.impl.Factory;
import org.apache.airavata.gfac.impl.GFacWorker;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Subscriber;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessIdentifier;
import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.ExpCatChildDataType;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class GfacServerHandler implements GfacService.Iface {
private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class);
private Subscriber processLaunchSubscriber;
private static int requestCount=0;
private ExperimentCatalog experimentCatalog;
private AppCatalog appCatalog;
private String airavataUserName;
private CuratorFramework curatorClient;
private Publisher statusPublisher;
private String airavataServerHostPort;
private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
private ExecutorService executorService;
public GfacServerHandler() throws AiravataStartupException {
try {
Factory.loadConfiguration();
startCuratorClient();
initZkDataStructure();
initAMQPClient();
executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
} catch (Exception e) {
throw new AiravataStartupException("Gfac Server Initialization error ", e);
}
}
private void initAMQPClient() throws AiravataException {
// init process consumer
Factory.initPrcessLaunchSubscriber(new ProcessLaunchMessageHandler());
processLaunchSubscriber = Factory.getProcessLaunchSubscriber();
// init status publisher
statusPublisher = Factory.getStatusPublisher();
}
private void startCuratorClient() throws ApplicationSettingsException {
curatorClient = Factory.getCuratorClient();
curatorClient.start();
}
private void initZkDataStructure() throws Exception {
/*
*|/servers
* - /gfac
* - /gfac-node0 (localhost:2181)
*|/experiments
*/
airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort();
// create PERSISTENT nodes
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath());
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), ZkConstants.ZOOKEEPER_EXPERIMENT_NODE);
// create EPHEMERAL server name node
String gfacName = ServerSettings.getGFacServerName();
if (curatorClient.checkExists().forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath() ,gfacName)) == null) {
curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath(), gfacName));
}
curatorClient.setData().withVersion(-1).forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath(),
gfacName), airavataServerHostPort.getBytes());
}
public String getGFACServiceVersion() throws TException {
return gfac_cpiConstants.GFAC_CPI_VERSION;
}
/**
* * After creating the experiment Data and Task Data in the orchestrator
* * Orchestrator has to invoke this operation for each Task per experiment to run
* * the actual Job related actions.
* *
* * @param experimentID
* * @param taskID
* * @param gatewayId:
* * The GatewayId is inferred from security context and passed onto gfac.
* * @return sucess/failure
* *
* *
*
* @param processId - processModel id in registry
* @param gatewayId - gateway Identification
*/
public boolean submitProcess(String processId, String gatewayId, String tokenId) throws
TException {
MDC.put(MDCConstants.PROCESS_ID, processId);
MDC.put(MDCConstants.GATEWAY_ID, gatewayId);
MDC.put(MDCConstants.TOKEN_ID, tokenId);
try {
executorService.execute(MDCUtil.wrapWithMDC(new GFacWorker(processId, gatewayId, tokenId)));
} catch (GFacException e) {
log.error("Failed to submit process", e);
throw new TException("Failed to submit process", e);
} catch (CredentialStoreException e) {
log.error("Failed to submit process due to credential issue, " +
"make sure you are passing a valid credentials");
throw new TException("Failed to submit process due to credential issue, " +
"make sure you are passing a valid credential token", e);
} catch (Exception e) {
log.error("Unknown error while submitting the process", e);
throw new TException("Unknown error while submitting the process", e);
}
return true;
}
@Override
public boolean cancelProcess(String processId, String gatewayId, String tokenId) throws TException {
return false;
}
private class ProcessLaunchMessageHandler implements MessageHandler {
private String experimentNode;
private String gfacServerName;
public ProcessLaunchMessageHandler() throws ApplicationSettingsException {
experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
gfacServerName = ServerSettings.getGFacServerName();
}
public void onMessage(MessageContext messageContext) {
MDC.put(MDCConstants.GATEWAY_ID, messageContext.getGatewayId());
log.info(" Message Received with message id {} and with message type: {}" + messageContext.getMessageId(), messageContext.getType());
if (messageContext.getType().equals(MessageType.LAUNCHPROCESS)) {
ProcessStatus status = new ProcessStatus();
status.setState(ProcessState.STARTED);
try {
ProcessSubmitEvent event = new ProcessSubmitEvent();
TBase messageEvent = messageContext.getEvent();
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
if (messageContext.isRedeliver()) {
// check the process is already active in this instance.
if (Factory.getGfacContext().getProcess(event.getProcessId()) != null) {
// update deliver tag
try {
updateDeliveryTag(curatorClient, gfacServerName, event, messageContext );
return;
} catch (Exception e) {
log.error("Error while updating delivery tag for redelivery message , messageId : " +
messageContext.getMessageId(), e);
processLaunchSubscriber.sendAck(messageContext.getDeliveryTag());
}
} else {
// read process status from registry
ProcessStatus processStatus = ((ProcessStatus) Factory.getDefaultExpCatalog().get
(ExperimentCatalogModelType.PROCESS_STATUS, event.getProcessId()));
status.setState(processStatus.getState());
// write server name to zookeeper , this is happen inside createProcessZKNode(...) method
}
}
// update process status
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event
.getProcessId());
publishProcessStatus(event, status);
MDC.put(MDCConstants.EXPERIMENT_ID, event.getExperimentId());
try {
createProcessZKNode(curatorClient, gfacServerName, event, messageContext);
boolean isCancel = setCancelWatcher(curatorClient, event.getExperimentId(), event.getProcessId());
if (isCancel) {
if (status.getState() == ProcessState.STARTED) {
status.setState(ProcessState.CANCELLING);
status.setReason("Process Cancel is triggered");
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
Factory.getDefaultExpCatalog()
.update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
publishProcessStatus(event, status);
// do cancel operation here
status.setState(ProcessState.CANCELED);
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
Factory.getDefaultExpCatalog()
.update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
publishProcessStatus(event, status);
processLaunchSubscriber.sendAck(messageContext.getDeliveryTag());
return;
} else {
setCancelData(event.getExperimentId(),event.getProcessId());
}
}
try {
submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
} catch (TException e) {
submissionErrorHandling(status, event, e);
processLaunchSubscriber.sendAck(messageContext.getDeliveryTag());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
processLaunchSubscriber.sendAck(messageContext.getDeliveryTag());
}
} catch (TException e) {
log.error(e.getMessage(), e); //nobody is listening so nothing to throw
} catch (RegistryException e) {
log.error("Error while updating experiment status", e);
} catch (AiravataException e) {
log.error("Error while publishing process status", e);
} finally {
MDC.clear();
}
}
}
}
private void submissionErrorHandling(ProcessStatus status, ProcessSubmitEvent event, TException e) throws RegistryException, AiravataException {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
ErrorModel errorModel = new ErrorModel();
errorModel.setUserFriendlyMessage("Process execution failed");
errorModel.setActualErrorMessage(errors.toString());
errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
Factory.getDefaultExpCatalog()
.add(ExpCatChildDataType.PROCESS_ERROR, errorModel, event.getProcessId());
errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
Factory.getDefaultExpCatalog()
.add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, event.getExperimentId());
status.setState(ProcessState.FAILED);
status.setReason("Process execution failed");
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
Factory.getDefaultExpCatalog()
.update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
publishProcessStatus(event, status);
}
private void setCancelData(String experimentId, String processId) throws Exception {
String processCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZKPaths.makePath(
ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), processId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
log.info("expId: {}, processId: {}, set process cancel data to zookeeper node {}", experimentId, processId, processCancelNodePath);
curatorClient.setData().withVersion(-1).forPath(processCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
.getBytes());
}
private boolean setCancelWatcher(CuratorFramework curatorClient,
String experimentId,
String processId) throws Exception {
String experimentNodePath = GFacUtils.getExperimentNodePath(experimentId);
// /experiments/{experimentId}/cancelListener, set watcher for data changes
String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
byte[] bytes = curatorClient.getData().forPath(experimentCancelNode);
if (bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
return true;
} else {
bytes = curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath(experimentCancelNode);
return bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST);
}
}
private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus status) throws AiravataException {
ProcessIdentifier identifier = new ProcessIdentifier(event.getProcessId(),
event.getExperimentId(),
event.getGatewayId());
ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
AiravataUtils.getId(MessageType.PROCESS.name()), event.getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
statusPublisher.publish(msgCtx);
}
private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName,ProcessSubmitEvent event
,MessageContext messageContext) throws Exception {
String processId = event.getProcessId();
String token = event.getTokenId();
String experimentId = event.getExperimentId();
long deliveryTag = messageContext.getDeliveryTag();
// create /experiments//{experimentId}{processId} node and set data - serverName, add redelivery listener
String experimentNodePath = GFacUtils.getExperimentNodePath(experimentId);
String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, processId);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath);
curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher(experimentId, processId)).forPath(zkProcessNodePath);
// create /experiments//{experimentId}{processId}/cancelListener
String zkProcessCancelPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessCancelPath);
// create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag
String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath);
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
// create /experiments/{experimentId}/{processId}/token node and set data - token
String tokenNodePath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_TOKEN_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath);
curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes());
}
private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, ProcessSubmitEvent event,
MessageContext messageContext) throws Exception {
String experimentId = event.getExperimentId();
String processId = event.getProcessId();
long deliveryTag = messageContext.getDeliveryTag();
String processNodePath = ZKPaths.makePath(GFacUtils.getExperimentNodePath(experimentId), processId);
Stat stat = curatorClient.checkExists().forPath(processNodePath);
if (stat != null) {
// create /experiments/{processId}/deliveryTag node and set data - deliveryTag
String deliveryTagPath = ZKPaths.makePath(processNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
}
}
}