blob: 927b377fcb9402cabd5e57dbd0edb45fc74949e2 [file] [log] [blame]
package edu.iu.helix.airavata;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.print.attribute.standard.JobState;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* Created by goshenoy on 6/21/17.
*/
public class HelixClusterManager {
private static final Logger logger = LogManager.getLogger(HelixClusterManager.class);
private int numWorkers;
private int numPartitions;
private String clusterName;
private String zkAddress;
private HelixManager helixManager;
private TaskDriver taskDriver;
private ControllerNode controllerNode;
private HelixCluster helixCluster;
private static final String CONTROLLER_NAME = "HelixDemoController";
private static final String PARTICIPANT_NAME = "HelixDemoParticipant_";
public HelixClusterManager(String clusterName, String zkAddress, int numWorkers, int numPartitions) {
this.numWorkers = numWorkers;
this.numPartitions = numPartitions;
this.clusterName = clusterName;
this.zkAddress = zkAddress;
}
public void startHelixCluster() {
// create the cluster
helixCluster = new HelixCluster(zkAddress, clusterName, numPartitions);
helixCluster.setup();
System.out.println("Successfully created helix cluster: " + clusterName + ", with [ " + numPartitions + " ] partitions.");
// start the participants
Executor executor = Executors.newFixedThreadPool(numWorkers);
for (int i = 0; i < numWorkers; i++) {
String participantName = PARTICIPANT_NAME + i;
ParticipantNode participant = new ParticipantNode(zkAddress, clusterName, participantName);
executor.execute(participant);
System.out.println("Successfully started participant node: " + participantName + ", on cluster: " + clusterName);
}
// add resources to cluster
helixCluster.addResourcesToCluster();
System.out.println("Successfully added resources to cluster: " + clusterName);
// start the controller
controllerNode = new ControllerNode(zkAddress, clusterName, CONTROLLER_NAME);
controllerNode.start();
System.out.println("Successfully started the controller node: " + CONTROLLER_NAME + ", on cluster: " + clusterName);
// start the cluster manager
connect();
System.out.println("Successfully started the helix cluster manager (admin), on cluster: " + clusterName);
}
private void connect() {
this.helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "Admin",
InstanceType.SPECTATOR, zkAddress);
try {
this.helixManager.connect();
} catch (Exception ex) {
logger.error("Error in connect() for Admin, reason: " + ex, ex);
}
this.taskDriver = new TaskDriver(helixManager);
logger.debug("HelixManager Admin connected.");
Runtime.getRuntime().addShutdownHook(
new Thread() {
@Override
public void run() {
disconnect();
controllerNode.stop();
helixCluster.disconnect();
}
}
);
}
private void disconnect() {
if (helixManager != null) {
System.out.println("HelixManager Admin disconnecting from cluster: " + clusterName);
helixManager.disconnect();
}
}
public boolean submitDag(HelixUtil.DAGType dagType) {
try {
Workflow workflow = HelixUtil.getWorkflow(dagType);
taskDriver.start(workflow);
System.out.println("Started workflow for DagType: " + dagType + ", in cluster: " + clusterName);
taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED);
// while (true) {
// Thread.sleep(100);
// WorkflowContext context = taskDriver.getWorkflowContext(workflow.getName());
// System.out.println(context);
// if (context != null && context.getWorkflowState() != null) {
// if (context.getWorkflowState().equals(TaskState.COMPLETED)) {
// System.out.println("Workflow completed!");
// break;
// } else if (context.getWorkflowState().equals(TaskState.FAILED)) {
// System.err.println("Workflow failed!");
// break;
// }
// }
// }
return true;
} catch (Exception ex) {
logger.error("Error submitting Dag for type: " + dagType + ", reason: " + ex, ex);
return false;
}
}
public static void main(String[] args) {
String clusterName = HelixUtil.CLUSTER_NAME;
String zkAddress = HelixUtil.ZK_ADDRESS;
int numWorkers = 3;
int numPartitions = 1;
try {
System.out.println("Starting helix manager for cluster [ " + clusterName + " ], " +
"on ZK server [ " + zkAddress + " ], " +
"with [ " + numWorkers + " ] workers, " +
"having [ " + numPartitions + "] partitions.");
HelixClusterManager manager = new HelixClusterManager(clusterName, zkAddress, numWorkers, numPartitions);
manager.startHelixCluster();
System.out.println("Submitting Workflow for DagType: " + HelixUtil.DAGType.SSH);
if (manager.submitDag(HelixUtil.DAGType.SSH)) {
System.out.println("Successfully completed workflow for Dag: " + HelixUtil.DAGType.SSH);
} else {
throw new Exception("Failed to run workflow for Dag: " + HelixUtil.DAGType.SSH);
}
} catch (Exception ex) {
logger.error("Something went wrong while running helix cluster manager. Reason: " + ex, ex);
}
System.out.println("*** Exiting System ***");
// System.exit(0);
}
}