blob: aee4e062c556e2243edf6c45fb08a72fb7ed6193 [file] [log] [blame]
package edu.iu.helix.play;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.examples.OnlineOfflineStateModelFactory;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.*;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.testng.Assert;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by goshenoy on 6/18/17.
*/
public class HelixDAGFramework {
private static Logger logger = LogManager.getLogger(HelixDAGFramework.class);
private static final String ZK_ADDRESS = "localhost:2199";
private static final String PARTICIPANT_ADDRESS = "localhost:12918";
private static final String CLUSTER_NAME = "HelixDagCluster";
private static final String RESOURCE_NAME = "HelixResource";
private static final String CONTROLLER_NAME = "HelixController";
private static final String ADMIN_NAME = "HelixAdmin";
private static final String WORKFLOW_NAME = "helix_workflow";
private static final String JOB_NAME = "helix_job";
private static final String TASK_A_NAME = "helix_task_id_a";
private static final String TASK_B_NAME = "helix_task_id_b";
private static final String ONLINE_OFFLINE = "OnlineOffline";
private static HelixManager adminManager;
private static ZKHelixManager participantManager;
private static ZKHelixManager controllerManager;
private static ClusterSetup _setupTool;
public static void main(String[] args) {
try {
// create cluster
createCluster();
// add instance, resource to cluster
addInstanceToCluster();
addResourceToCluster();
// create participant, controller & admin manager
createParticipantManager();
createControllerManager();
createAdminManager();
// verifying cluster state
verifyClusterState();
// create task-driver
logger.info("edu.iu.helix.play.HelixDAGFramework | Creating TaskDriver.");
TaskDriver taskDriver = new TaskDriver(adminManager);
// create task-config list
logger.info("edu.iu.helix.play.HelixDAGFramework | Creating TaskConfig list.");
List<TaskConfig> taskConfigList1 = new ArrayList<TaskConfig>();
List<TaskConfig> taskConfigList2 = new ArrayList<TaskConfig>();
taskConfigList1.add(
new TaskConfig.Builder().setTaskId(TASK_A_NAME).setCommand(HelixTaskA.TASK_COMMAND).build()
);
taskConfigList2.add(
new TaskConfig.Builder().setTaskId(TASK_B_NAME).setCommand(HelixTaskB.TASK_COMMAND).build()
);
// create job-config-builder
logger.info("edu.iu.helix.play.HelixDAGFramework | Creating JobConfig.Builder.");
JobConfig.Builder jobConfigBuilder1 = new JobConfig.Builder().addTaskConfigs(taskConfigList1);
JobConfig.Builder jobConfigBuilder2 = new JobConfig.Builder().addTaskConfigs(taskConfigList2);
// create workflow-builder & add job
logger.info("edu.iu.helix.play.HelixDAGFramework | Creating Workflow.Builder.");
Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW_NAME).setExpiry(0);
logger.info("edu.iu.helix.play.HelixDAGFramework | Adding Jobs (a,b) to Workflow.Builder.");
workflowBuilder.addJob(JOB_NAME + "_a", jobConfigBuilder1);
workflowBuilder.addJob(JOB_NAME + "_b", jobConfigBuilder2);
logger.info("edu.iu.helix.play.HelixDAGFramework | Setting Job A parent of Job B.");
workflowBuilder.addParentChildDependency(JOB_NAME + "_a", JOB_NAME + "_b");
// start the workflow
logger.info("edu.iu.helix.play.HelixDAGFramework | Starting the Workflow.");
taskDriver.start(workflowBuilder.build());
// waiting for job to complete
logger.info("edu.iu.helix.play.HelixDAGFramework | Waiting for Workflow to COMPLETE.");
// taskDriver.pollForJobState(WORKFLOW_NAME, WORKFLOW_NAME + "_" + JOB_NAME, TaskState.COMPLETED);
taskDriver.pollForWorkflowState(WORKFLOW_NAME, TaskState.COMPLETED);
// job completed, exit
logger.info("edu.iu.helix.play.HelixDAGFramework | Job Completed, Exiting.");
} catch (Exception ex) {
logger.error("Exception caught | ex: " + ex.getMessage(), ex);
} finally {
// disconnect all managers
logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting All Managers (Participant, Controller, Admin).");
disconnectManagers();
logger.info("edu.iu.helix.play.HelixDAGFramework | Bye!");
}
}
private static void createCluster() {
logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a cluster.");
_setupTool = new ClusterSetup(ZK_ADDRESS);
_setupTool.addCluster(CLUSTER_NAME, true);
}
private static void addInstanceToCluster() {
logger.info("edu.iu.helix.play.HelixDAGFramework | Adding instanace to cluster.");
_setupTool.addInstanceToCluster(CLUSTER_NAME, PARTICIPANT_ADDRESS);
}
private static void addResourceToCluster() {
logger.info("edu.iu.helix.play.HelixDAGFramework | Adding resource to cluster.");
_setupTool.addResourceToCluster(CLUSTER_NAME, RESOURCE_NAME, 1, ONLINE_OFFLINE);
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, RESOURCE_NAME, 1);
}
private static void createParticipantManager() {
logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a Participant Manager.");
String instanceName = PARTICIPANT_ADDRESS.replaceAll(":", "_");
participantManager = new ZKHelixManager(CLUSTER_NAME, instanceName, InstanceType.PARTICIPANT, ZK_ADDRESS);
logger.info("edu.iu.helix.play.HelixDAGFramework | Registering SMF for Participant.");
StateMachineEngine sme = participantManager.getStateMachineEngine();
sme.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), new OnlineOfflineStateModelFactory());
try {
logger.info("edu.iu.helix.play.HelixDAGFramework | Registering Task for Participant.");
registerTaskAndStateModel();
logger.info("edu.iu.helix.play.HelixDAGFramework | Starting Participant Manager.");
participantManager.connect();
} catch (Exception ex) {
logger.error("Error creating Participant Manager, ex: " + ex, ex);
}
}
private static void createAdminManager() {
logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a Admin Manager.");
adminManager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, ADMIN_NAME, InstanceType.ADMINISTRATOR, ZK_ADDRESS);
try {
logger.info("edu.iu.helix.play.HelixDAGFramework | Starting Admin Manager.");
adminManager.connect();
} catch (Exception ex) {
logger.error("Error creating Admin Manager, ex: " + ex, ex);
}
}
private static void createControllerManager() {
logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a Controller Manager.");
controllerManager = new ZKHelixManager(CLUSTER_NAME, CONTROLLER_NAME, InstanceType.CONTROLLER, ZK_ADDRESS);
try {
logger.info("edu.iu.helix.play.HelixDAGFramework | Starting Controller Manager.");
controllerManager.connect();
} catch (Exception ex) {
logger.error("Error creating Controller Manager, ex: " + ex, ex);
}
}
private static void registerTaskAndStateModel() {
logger.info("edu.iu.helix.play.HelixDAGFramework | Registering Task.");
Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
taskRegistry.put(HelixTaskA.TASK_COMMAND, new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
return new HelixTaskA(context);
}
});
taskRegistry.put(HelixTaskB.TASK_COMMAND, new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
return new HelixTaskB(context);
}
});
logger.info("edu.iu.helix.play.HelixDAGFramework | Registering Task StateModel Factory.");
StateMachineEngine sme = participantManager.getStateMachineEngine();
sme.registerStateModelFactory("Task", new TaskStateModelFactory(participantManager, taskRegistry));
}
private static void verifyClusterState() {
logger.info("edu.iu.helix.play.HelixDAGFramework | Verifying Cluster State.");
boolean result = ClusterStateVerifier.verifyByZkCallback(
new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDRESS, CLUSTER_NAME));
Assert.assertTrue(result);
}
private static void disconnectManagers() {
logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting Admin Manager.");
adminManager.disconnect();
logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting Participant Manager.");
participantManager.disconnect();
logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting Contoller Manager.");
controllerManager.disconnect();
System.exit(0);
}
}