blob: 03589e75a1cc76bc26e7b0424dd017873ededb95 [file] [log] [blame]
package org.apache.airavata.helix.api;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.compute.api.ComputeOperations;
import org.apache.airavata.k8s.compute.impl.MockComputeOperation;
import org.apache.airavata.k8s.compute.impl.SSHComputeOperations;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskResult;
import org.apache.helix.task.UserContentStore;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.web.client.RestTemplate;
import java.util.Properties;
/**
* TODO: Class level comments please
*
* @author dimuthu
* @since 1.0.0-SNAPSHOT
*/
public abstract class AbstractTask extends UserContentStore implements Task {
public static final String NEXT_JOB = "next-job";
public static final String WORKFLOW_STARTED = "workflow-started";
public static final String TASK_ID = "task_id";
public static final String PROCESS_ID = "process_id";
//Configurable values
private String apiServerUrl = "localhost:8080";
private String kafkaBootstrapUrl = "localhost:9092";
private String eventTopic = "airavata-task-event";
private TaskCallbackContext callbackContext;
private RestTemplate restTemplate;
private Producer<String, String> eventProducer;
private long processId;
private long taskId;
public AbstractTask(TaskCallbackContext callbackContext) {
this.callbackContext = callbackContext;
this.taskId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(TASK_ID));
this.processId = Long.parseLong(this.callbackContext.getTaskConfig().getConfigMap().get(PROCESS_ID));
this.restTemplate = new RestTemplate();
initializeKafkaEventProducer();
init();
}
public TaskCallbackContext getCallbackContext() {
return callbackContext;
}
@Override
public final TaskResult run() {
boolean isThisNextJob = getUserContent(WORKFLOW_STARTED, Scope.WORKFLOW) == null ||
this.callbackContext.getJobConfig().getJobId()
.equals(this.callbackContext.getJobConfig().getWorkflow() + "_" + getUserContent(NEXT_JOB, Scope.WORKFLOW));
if (isThisNextJob) {
return onRun();
} else {
return new TaskResult(TaskResult.Status.COMPLETED, "Not a target job");
}
}
@Override
public final void cancel() {
onCancel();
}
public void init() {
}
public abstract TaskResult onRun();
public abstract void onCancel();
public void sendToOutPort(String port) {
putUserContent(WORKFLOW_STARTED, "TRUE", Scope.WORKFLOW);
String outJob = getCallbackContext().getTaskConfig().getConfigMap().get("OUT_" + port);
if (outJob != null) {
putUserContent(NEXT_JOB, outJob, Scope.WORKFLOW);
}
}
public RestTemplate getRestTemplate() {
return restTemplate;
}
public String getApiServerUrl() {
return apiServerUrl;
}
public ComputeOperations fetchComputeResourceOperation(ComputeResource computeResource) throws Exception {
ComputeOperations operations;
if ("SSH".equals(computeResource.getCommunicationType())) {
operations = new SSHComputeOperations(computeResource.getHost(), computeResource.getUserName(), computeResource.getPassword());
} else if ("Mock".equals(computeResource.getCommunicationType())) {
operations = new MockComputeOperation(computeResource.getHost());
} else {
throw new Exception("No compatible communication method {" + computeResource.getCommunicationType() + "} not found for compute resource " + computeResource.getName());
}
return operations;
}
public void initializeKafkaEventProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", this.kafkaBootstrapUrl);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
eventProducer = new KafkaProducer<String, String>(props);
}
public void publishTaskStatus(long status, String reason) {
eventProducer.send(new ProducerRecord<String, String>(
this.eventTopic, String.join(",", this.processId + "", this.taskId + "", status + "", reason)));
}
public long getProcessId() {
return processId;
}
public long getTaskId() {
return taskId;
}
}