blob: d01f27526efde77770b07f62041d1f7f585af5cd [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.helix.core;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.core.participant.HelixParticipant;
import org.apache.airavata.helix.core.util.MonitoringUtil;
import org.apache.airavata.helix.core.util.TaskUtil;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
import org.apache.airavata.helix.task.api.annotation.TaskParam;
import org.apache.airavata.patform.monitoring.CountMonitor;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.helix.HelixManager;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskResult;
import org.apache.helix.task.UserContentStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TODO: Class level comments please
*
* @author dimuthu
* @since 1.0.0-SNAPSHOT
*/
public abstract class AbstractTask extends UserContentStore implements Task {
private final static Logger logger = LoggerFactory.getLogger(AbstractTask.class);
private final static CountMonitor taskInitCounter = new CountMonitor("task_init_count");
private final static CountMonitor taskRunCounter = new CountMonitor("task_run_count");
private final static CountMonitor taskCancelCounter = new CountMonitor("task_cancel_count");
private final static CountMonitor taskFailCounter = new CountMonitor("task_fail_count");
private final static CountMonitor taskCompleteCounter = new CountMonitor("task_complete_count");
private static final String NEXT_JOB = "next-job";
private static final String WORKFLOW_STARTED = "workflow-started";
private static CuratorFramework curatorClient = null;
@TaskParam(name = "taskId")
private String taskId;
@TaskOutPort(name = "Next Task")
private OutPort nextTask;
private TaskCallbackContext callbackContext;
private TaskHelper taskHelper;
private HelixParticipant participant;
@TaskParam(name = "Retry Count")
private int retryCount = 3;
@Override
public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
super.init(manager, workflowName, jobName, taskName);
try {
taskInitCounter.inc();
TaskUtil.deserializeTaskData(this, this.callbackContext.getTaskConfig().getConfigMap());
} catch (Exception e) {
taskFailCounter.inc();
logger.error("Deserialization of task parameters failed", e);
}
if (participant != null) {
participant.registerRunningTask(this);
} else {
logger.warn("Task with id: " + taskId + " is not registered since the participant is not set");
}
}
@Override
public final TaskResult run() {
try {
taskRunCounter.inc();
boolean isThisNextJob = getUserContent(WORKFLOW_STARTED, Scope.WORKFLOW) == null ||
this.callbackContext.getJobConfig().getJobId()
.equals(this.callbackContext.getJobConfig().getWorkflow() + "_" + getUserContent(NEXT_JOB, Scope.WORKFLOW));
return isThisNextJob ? onRun(this.taskHelper) : new TaskResult(TaskResult.Status.COMPLETED, "Not a target job");
} finally {
if (participant != null) {
participant.unregisterRunningTask(this);
} else {
logger.warn("Task with id: " + taskId + " is not unregistered since the participant is not set");
}
}
}
@Override
public final void cancel() {
try {
taskCancelCounter.inc();
logger.info("Cancelling task " + taskId);
onCancel();
} finally {
if (participant != null) {
participant.unregisterRunningTask(this);
} else {
logger.warn("Task with id: " + taskId + " is not unregistered since the participant is not set");
}
}
}
public abstract TaskResult onRun(TaskHelper helper);
public abstract void onCancel();
protected TaskResult onSuccess(String message) {
taskCompleteCounter.inc();
String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : "");
logger.info(successMessage);
return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
}
protected TaskResult onFail(String reason, boolean fatal) {
taskFailCounter.inc();
return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, reason);
}
protected void publishErrors(Throwable e) {
// TODO Publish through kafka channel with task and workflow id
e.printStackTrace();
}
public void sendNextJob(String jobId) {
putUserContent(WORKFLOW_STARTED, "TRUE", Scope.WORKFLOW);
if (jobId != null) {
putUserContent(NEXT_JOB, jobId, Scope.WORKFLOW);
}
}
protected void setContextVariable(String key, String value) {
putUserContent(key, value, Scope.WORKFLOW);
}
protected String getContextVariable(String key) {
return getUserContent(key, Scope.WORKFLOW);
}
// Getters and setters
public String getTaskId() {
return taskId;
}
public AbstractTask setTaskId(String taskId) {
this.taskId = taskId;
return this;
}
public TaskCallbackContext getCallbackContext() {
return callbackContext;
}
public AbstractTask setCallbackContext(TaskCallbackContext callbackContext) {
this.callbackContext = callbackContext;
return this;
}
public TaskHelper getTaskHelper() {
return taskHelper;
}
public AbstractTask setTaskHelper(TaskHelper taskHelper) {
this.taskHelper = taskHelper;
return this;
}
protected int getCurrentRetryCount() throws Exception {
return MonitoringUtil.getTaskRetryCount(getCuratorClient(), taskId);
}
protected void markNewRetry(int currentRetryCount) throws Exception {
MonitoringUtil.increaseTaskRetryCount(getCuratorClient(), taskId, currentRetryCount);
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
// set the default retry count to 1
this.retryCount = retryCount <= 0 ? 1 : retryCount;
}
public OutPort getNextTask() {
return nextTask;
}
public void setNextTask(OutPort nextTask) {
this.nextTask = nextTask;
}
protected synchronized CuratorFramework getCuratorClient() {
if (curatorClient == null) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
try {
this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
this.curatorClient.start();
} catch (ApplicationSettingsException e) {
logger.error("Failed to create curator client ", e);
throw new RuntimeException(e);
}
}
return curatorClient;
}
public AbstractTask setParticipant(HelixParticipant participant) {
this.participant = participant;
return this;
}
}