blob: b2b634d9aba0958f516ab40ffb841afaceeaad64 [file] [log] [blame]
package edu.iu.helix.airavata;
import edu.iu.helix.airavata.tasks.HelixTaskA;
import edu.iu.helix.airavata.tasks.HelixTaskB;
import edu.iu.helix.airavata.tasks.HelixTaskC;
import edu.iu.helix.airavata.tasks.HelixTaskD;
import edu.iu.helix.airavata.tasks.ssh.SSHKeyAuthentication;
import edu.iu.helix.airavata.tasks.ssh.SSHServerInfo;
import edu.iu.helix.airavata.tasks.ssh.SSHTask;
import edu.iu.helix.airavata.tasks.ssh.SSHTaskContext;
import org.apache.commons.io.IOUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.helix.task.*;
import org.jboss.netty.util.internal.ThreadLocalRandom;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* Created by goshenoy on 6/21/17.
*/
public class HelixUtil {
public static final String TASK_STATE_DEF = "Task";
public static final String ZK_ADDRESS = "localhost:2199";
public static final String CLUSTER_NAME = "HelixDemoCluster";
public static final String SSH_WORKFLOW = "SSH_Workflow";
public static final String CREATE_DIR_TASK = "Task_CreateDir";
public static final String COPY_PBS_TASK = "Task_CopyPBS";
public static final String COPY_PY_TASK = "Task_CopyPY";
public static final String QSUB_TASK = "Task_QSUB";
public static final String USERNAME = "username";
public static final String PRIVATE_KEY = "private_key";
public static final String PUBLIC_KEY = "public_key";
public static final String HOSTNAME = "hostname";
public static final String PORT = "port";
public static final String COMMAND = "command";
public static final String SRC_PATH = "src_path";
public static final String DEST_PATH = "dest_path";
public enum DAGType {
TYPE_A,
TYPE_B,
TYPE_C,
SSH
}
private static Workflow.Builder getWorkflowBuilder(DAGType dagType) throws Exception {
Workflow.Builder workflow = null;
if (dagType.equals(DAGType.SSH)) {
if (!setWorkflowData()) {
throw new Exception("Failed to create zk data for SSH workflow");
}
// create dir task - 1 task for job
List<TaskConfig> createDirTask = new ArrayList<TaskConfig>();
createDirTask.add(new TaskConfig.Builder().setTaskId(CREATE_DIR_TASK).setCommand(SSHTask.TASK_COMMAND).build());
// copy files task - 2 tasks for job
List<TaskConfig> copyFilesTask = new ArrayList<TaskConfig>();
copyFilesTask.add(new TaskConfig.Builder().setTaskId(COPY_PBS_TASK).setCommand(SSHTask.TASK_COMMAND).build());
copyFilesTask.add(new TaskConfig.Builder().setTaskId(COPY_PY_TASK).setCommand(SSHTask.TASK_COMMAND).build());
// qsub task - 1 task for job
List<TaskConfig> qsubTask = new ArrayList<TaskConfig>();
qsubTask.add(new TaskConfig.Builder().setTaskId(QSUB_TASK).setCommand(SSHTask.TASK_COMMAND).build());
// create-dir job config
JobConfig.Builder createDirJob = new JobConfig.Builder().addTaskConfigs(createDirTask).setMaxAttemptsPerTask(3);
// copy-files job config
JobConfig.Builder copyFilesJob = new JobConfig.Builder().addTaskConfigs(copyFilesTask).setMaxAttemptsPerTask(3);
// qsub job config
JobConfig.Builder qsubJob = new JobConfig.Builder().addTaskConfigs(qsubTask).setMaxAttemptsPerTask(1);
// create workflow
workflow = new Workflow.Builder(SSH_WORKFLOW).setExpiry(0);
workflow.addJob("createDirJob", createDirJob);
workflow.addJob("copyFilesJob", copyFilesJob);
workflow.addJob("qsubJob", qsubJob);
} else {
// create task configs
List<TaskConfig> taskConfig1 = new ArrayList<TaskConfig>();
List<TaskConfig> taskConfig2 = new ArrayList<TaskConfig>();
List<TaskConfig> taskConfig3 = new ArrayList<TaskConfig>();
List<TaskConfig> taskConfig4 = new ArrayList<TaskConfig>();
taskConfig1.add(new TaskConfig.Builder().setTaskId("helix_task_a").setCommand(HelixTaskA.TASK_COMMAND).build());
taskConfig2.add(new TaskConfig.Builder().setTaskId("helix_task_b").setCommand(HelixTaskB.TASK_COMMAND).build());
taskConfig3.add(new TaskConfig.Builder().setTaskId("helix_task_c").setCommand(HelixTaskC.TASK_COMMAND).build());
taskConfig4.add(new TaskConfig.Builder().setTaskId("helix_task_d").setCommand(HelixTaskD.TASK_COMMAND).build());
// create job configs
JobConfig.Builder jobConfig1 = new JobConfig.Builder().addTaskConfigs(taskConfig1).setMaxAttemptsPerTask(3);
JobConfig.Builder jobConfig2 = new JobConfig.Builder().addTaskConfigs(taskConfig2).setMaxAttemptsPerTask(3);
JobConfig.Builder jobConfig3 = new JobConfig.Builder().addTaskConfigs(taskConfig3).setMaxAttemptsPerTask(3);
JobConfig.Builder jobConfig4 = new JobConfig.Builder().addTaskConfigs(taskConfig4).setMaxAttemptsPerTask(3);
// create workflow
workflow = new Workflow.Builder("helix_workflow").setExpiry(0);
workflow.addJob("helix_job_a", jobConfig1);
workflow.addJob("helix_job_b", jobConfig2);
workflow.addJob("helix_job_c", jobConfig3);
workflow.addJob("helix_job_d", jobConfig4);
}
switch (dagType) {
case TYPE_A:
workflow.addParentChildDependency("helix_job_a", "helix_job_b");
workflow.addParentChildDependency("helix_job_b", "helix_job_c");
workflow.addParentChildDependency("helix_job_c", "helix_job_d");
break;
case TYPE_B:
workflow.addParentChildDependency("helix_job_a", "helix_job_c");
workflow.addParentChildDependency("helix_job_c", "helix_job_d");
workflow.addParentChildDependency("helix_job_d", "helix_job_b");
break;
case TYPE_C:
workflow.addParentChildDependency("helix_job_a", "helix_job_d");
workflow.addParentChildDependency("helix_job_d", "helix_job_b");
workflow.addParentChildDependency("helix_job_b", "helix_job_c");
break;
case SSH:
workflow.addParentChildDependency("createDirJob", "copyFilesJob");
workflow.addParentChildDependency("copyFilesJob", "qsubJob");
break;
}
return workflow;
}
public static Workflow getWorkflow(DAGType dagType) throws Exception {
Workflow.Builder workflowBuilder = getWorkflowBuilder(dagType);
return workflowBuilder.build();
}
private static String generateWorkflowName() {
return "workflow_" + ThreadLocalRandom.current().nextInt(9999);
}
private static boolean setWorkflowData() {
try {
CuratorFramework curatorClient = ZkUtils.getCuratorClient();
SSHKeyAuthentication br2SshAuthentication = new SSHKeyAuthentication(
"snakanda",
IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa")),
IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa.pub")),
"dummy",
HelixUtil.class.getClassLoader().getResource("ssh/known_hosts").getPath(),
false
);
SSHServerInfo br2 = new SSHServerInfo("snakanda", "bigred2.uits.iu.edu", br2SshAuthentication,22);
SSHTaskContext createDirTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.EXECUTE_COMMAND,
br2SshAuthentication, null, br2, "mkdir -p airavata");
SSHTaskContext qsubTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.EXECUTE_COMMAND,
br2SshAuthentication, null, br2, "qsub ~/airavata/job_tf.pbs");
SSHTaskContext copyPbsTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.FILE_COPY,
br2SshAuthentication, null, br2,
HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath(), "~/airavata/");
SSHTaskContext copyPyTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.FILE_COPY,
br2SshAuthentication, null, br2,
HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath(), "~/airavata/");
ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, CREATE_DIR_TASK, getBytes(createDirTC));
ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, COPY_PBS_TASK, getBytes(copyPbsTC));
ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, COPY_PY_TASK, getBytes(copyPyTC));
ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, QSUB_TASK, getBytes(qsubTC));
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
return true;
}
private static byte[] getBytes(Object object) throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = new ObjectOutputStream(bos);
out.writeObject(object);
out.flush();
return bos.toByteArray();
}
public static Object getObject(byte[] objectBytes) throws Exception {
ByteArrayInputStream bis = new ByteArrayInputStream(objectBytes);
ObjectInput in = new ObjectInputStream(bis);
return in.readObject();
}
// public static void main(String[] args) throws Exception {
//
// CuratorFramework curatorClient = ZkUtils.getCuratorClient();
//
// // set common data
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, USERNAME, "snakanda");
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, HOSTNAME, "bigred2.uits.iu.edu");
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PORT, "22");
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PUBLIC_KEY,
// IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa.pub")));
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PRIVATE_KEY,
// IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa")));
//
//
// // set data for mkdir
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + CREATE_DIR_TASK, COMMAND, "mkdir -p airavata");
//
// // set data for copy files
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PBS_TASK, SRC_PATH,
// HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath());
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PBS_TASK, DEST_PATH, "~/airavata/");
//
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PY_TASK, SRC_PATH,
// HelixUtil.class.getClassLoader().getResource("code_tf.py").getPath());
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PY_TASK, DEST_PATH, "~/airavata/");
//
// // set data for qsub
// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + CREATE_DIR_TASK, COMMAND, "qsub ~/airavata/job_tf.pbs");
// }
}