blob: 6236a6c7d51809dd637a77d6db333263153b841b [file] [log] [blame]
package org.apache.airavata.helix.impl.task.cancel;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.airavata.helix.task.api.annotation.TaskParam;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@TaskDef(name = "Workflow Cancellation Task")
public class WorkflowCancellationTask extends AbstractTask {
private final static Logger logger = LoggerFactory.getLogger(WorkflowCancellationTask.class);
private TaskDriver taskDriver;
private HelixManager helixManager;
@TaskParam(name = "Cancelling Workflow")
private String cancellingWorkflowName;
@TaskParam(name = "Waiting time to monitor status (s)")
private int waitTime = 20;
@Override
public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
super.init(manager, workflowName, jobName, taskName);
try {
helixManager = HelixManagerFactory.getZKHelixManager(ServerSettings.getSetting("helix.cluster.name"), taskName,
InstanceType.SPECTATOR, ServerSettings.getZookeeperConnection());
helixManager.connect();
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
if (helixManager.isConnected()) {
helixManager.disconnect();
}
})
);
taskDriver = new TaskDriver(helixManager);
} catch (Exception e) {
try {
if (helixManager != null) {
if (helixManager.isConnected()) {
helixManager.disconnect();
}
}
} catch (Exception ex) {
logger.warn("Failed to disconnect helix manager", ex);
}
logger.error("Failed to build Helix Task driver in " + taskName, e);
throw new RuntimeException("Failed to build Helix Task driver in " + taskName, e);
}
}
@Override
public TaskResult onRun(TaskHelper helper) {
logger.info("Cancelling workflow " + cancellingWorkflowName);
try {
if (taskDriver.getWorkflowConfig(cancellingWorkflowName) == null) {
// Workflow could be already deleted by cleanup agents
logger.warn("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
return onSuccess("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
}
try {
WorkflowContext workflowContext = taskDriver.getWorkflowContext(cancellingWorkflowName);
// if the workflow can not be found, ignore it
if (workflowContext == null) {
logger.warn("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
return onSuccess("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
}
TaskState workflowState = workflowContext.getWorkflowState();
logger.info("Current state of workflow " + cancellingWorkflowName + " : " + workflowState.name());
taskDriver.stop(cancellingWorkflowName);
} catch (Exception e) {
logger.error("Failed to stop workflow " + cancellingWorkflowName, e);
// in case of an error, retry
return onFail("Failed to stop workflow " + cancellingWorkflowName + ": " + e.getMessage(), false);
}
try {
logger.info("Waiting maximum " + waitTime + "s for workflow " + cancellingWorkflowName + " state to change");
TaskState newWorkflowState = taskDriver.pollForWorkflowState(cancellingWorkflowName, waitTime * 1000,
TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED, TaskState.NOT_STARTED);
logger.info("Workflow " + cancellingWorkflowName + " state changed to " + newWorkflowState.name());
return onSuccess("Successfully cancelled workflow " + cancellingWorkflowName);
} catch (Exception e) {
logger.warn("Failed while watching workflow to stop " + cancellingWorkflowName, e);
return onSuccess("Failed while watching workflow to stop " + cancellingWorkflowName + ". But continuing");
}
} finally {
try {
if (helixManager != null) {
if (helixManager.isConnected()) {
helixManager.disconnect();
}
}
} catch (Exception e) {
logger.warn("Failed to disconnect helix manager", e);
}
}
}
@Override
public void onCancel() {
}
public String getCancellingWorkflowName() {
return cancellingWorkflowName;
}
public void setCancellingWorkflowName(String cancellingWorkflowName) {
this.cancellingWorkflowName = cancellingWorkflowName;
}
public int getWaitTime() {
return waitTime;
}
public void setWaitTime(int waitTime) {
this.waitTime = waitTime;
}
}