blob: 6866af3e2630a9810030bf0841663a2f1a910890 [file] [log] [blame]
package org.apache.airavata.helix;
import org.apache.airavata.helix.tasks.command.CommandTask;
import org.apache.airavata.helix.tasks.DataCollectingTask;
import org.apache.airavata.helix.tasks.DataPushingTask;
import org.apache.helix.*;
import org.apache.helix.task.*;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
/**
* TODO: Class level comments please
*
* @author dimuthu
* @since 1.0.0-SNAPSHOT
*/
public class WorkflowManager {
private static final Logger logger = LogManager.getLogger(WorkflowManager.class);
public static void main(String args[]) {
Workflow workflow = createWorkflow().build();
org.apache.helix.HelixManager helixManager = HelixManagerFactory.getZKHelixManager("AiravataDemoCluster", "Admin",
InstanceType.SPECTATOR, "localhost:2199");
try {
helixManager.connect();
TaskDriver taskDriver = new TaskDriver(helixManager);
Runtime.getRuntime().addShutdownHook(
new Thread() {
@Override
public void run() {
helixManager.disconnect();
}
}
);
taskDriver.start(workflow);
logger.info("Started workflow");
TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
System.out.println("Task state " + taskState.name());
} catch (Exception ex) {
logger.error("Error in connect() for Admin, reason: " + ex, ex);
}
}
private static Workflow.Builder createWorkflow() {
List<TaskConfig> downloadDataTasks = new ArrayList<>();
downloadDataTasks.add(new TaskConfig.Builder().setTaskId("Download_Task").setCommand(DataCollectingTask.NAME).build());
List<TaskConfig> commandExecuteTasks = new ArrayList<>();
commandExecuteTasks.add(new TaskConfig.Builder().setTaskId("Command_Task").setCommand(CommandTask.NAME).build());
List<TaskConfig> pushDataTasks = new ArrayList<>();
pushDataTasks.add(new TaskConfig.Builder().setTaskId("Push_Task").setCommand(DataPushingTask.NAME).build());
JobConfig.Builder downloadDataJob = new JobConfig.Builder()
.addTaskConfigs(downloadDataTasks)
.setMaxAttemptsPerTask(3).setInstanceGroupTag("p1");
JobConfig.Builder commandExecuteJob = new JobConfig.Builder()
.addTaskConfigs(commandExecuteTasks)
.setMaxAttemptsPerTask(3).setInstanceGroupTag("p2");
JobConfig.Builder dataPushJob = new JobConfig.Builder()
.addTaskConfigs(pushDataTasks)
.setMaxAttemptsPerTask(3).setInstanceGroupTag("p3");
Workflow.Builder workflow = new Workflow.Builder("Airavata_Workflow3").setExpiry(0);
workflow.addJob("downloadDataJob", downloadDataJob);
workflow.addJob("commandExecuteJob", commandExecuteJob);
workflow.addJob("dataPushJob", dataPushJob);
workflow.addParentChildDependency("downloadDataJob", "commandExecuteJob");
workflow.addParentChildDependency("downloadDataJob", "dataPushJob");
return workflow;
}
}