Add Apache Helix playground project to sandbox
diff --git a/helix-playground/README.md b/helix-playground/README.md
new file mode 100644
index 0000000..3be47ce
--- /dev/null
+++ b/helix-playground/README.md
@@ -0,0 +1,134 @@
+# Using Apache Helix To Perform Distributed Task Execution in Apache Airavata
+
+## Introduction
+
+This project is a playground to test Apache Helix's task execution framework. Apache Helix is a generic cluster management framework, which allows one to build highly scalable and fault tolerant distributed systems. It provides a range of functionalities, including:
+* Automatic assignment of resources (task executors) and it’s partitions (parallelism of resources) to nodes in the cluster.
+* Detecting failure of nodes in the cluster, and taking appropriate actions to recover them back.
+* Cluster management – adding nodes and resources to cluster dynamically, load balancing.
+* Ability to define an IDEAL STATE for a node – and defining STATE transitions in case the state for a node deviates from the IDEAL one.
+
+Apart from these, Helix also provides out-of-the-box APIs to perform Distributed Task Execution. Some of the concepts Helix uses are ideal to our Airavata task execution use-case. These concepts include:
+* Tasks – actual runnable logic executors (eg: job submission, data staging, etc). Tasks return a TaskResult object which contains the state of the task once completed. These include, COMPLETED, FAILED, FATAL_FAILED. Difference between FAILED and FATAL_FAILED, is that FAILED tasks are re-run by Helix (threshold can be set), whereas FATAL_FAILED tasks are not.
+* Jobs – A combination of tasks, without dependencies; i.e. if there are > 1 tasks, they are run in parallel across workers.
+* Workflow – A combination of jobs arranged in a DAG. In a ONE-TIME workflow, once all jobs are completed, the workflow ends. In a RECURRING workflow, you can schedule workflows to run periodically.
+* Job Queues – Another type of workflow, but never ends – keeps accepting new incoming jobs. Ends only when user terminates it.
+
+[helix-tas-framework](images/helix-task-framework.png)
+
+* Helix also allows users to share data (key-value pairs) across Tasks/Jobs/Workflows. The content stored at workflow layer can shared by different jobs belong to this workflow. Similarly content persisted at job layer can shared by different tasks nested in this job.
+* Helix provides APIs to POLL either a JOB or WORKFLOW to reach a particular state.
+
+Some core concepts used in Helix which are important to know:
+* Participant – Is a node in a Helix cluster (a.k.a. an instance or worker), which host resources (a.k.a. tasks).
+* Controller – Is a node in a Helix cluster that monitors and controls the Participant nodes. The controller is responsible for checking if the state of a participant node matches the IDEAL state, and if not, perform STATE TRANSITIONS in order to bring that node back to IDEAL state.
+* State Model & State Transitions – Helix allows developers to define what state a participant node needs to be, in order to declare it healthy. Example, in an ONLINE-OFFLINE state model, a node is healthy if it is in ONLINE state; whereas if it goes OFFLINE (for any reason), we can define TRANSITION actions to bring it back ONLINE.
+* Cluster – Contains participants and controller nodes. One can define the State model for a cluster.
+
+More details about Apache Helix can be found on their website [https://helix.apache.org](https://helix.apache.org).
+
+## How can Helix be used in Airavata??
+Assuming we use Helix just to perform distributed task execution, I have the following in mind:
+* Create Helix Tasks (by implementing the Task interface) for each of our job-submission, data-staging, etc. These tasks are called resources.
+* Create Participant nodes (a.k.a. workers) to hold these resources. Helix allows us to create resource partitions, such that if we need a Task to run in parallel across workers, we can set the num_partitions > 1 for that resource.
+* Define a StateModel, either an OnlineOffline or MasterSlave, and necessary state transitions. With state transitions we can control the behavior of the participant nodes.
+* Create a WORKFLOW to execute a single experiment. This workflow will contain DAG necessary to run that experiment.
+* Create a long running QUEUE to keep accepting in-coming experiment requests. Each new experiment request will result in creation of a new JOB to be added to this queue – this job will contain one task – which is to create and run the workflow (mentioned in bullet above).
+
+## Building this Project
+This project uses Apache Maven to build the artifacts. Run the following command to make sure the project builds successfully.
+```cmd
+$ mvn clean install
+```
+
+## Running the Prototype
+Open the project in an IDE of your choice. Open the ```HelixClusterManager.java``` class. You can control the number of participant nodes (workers) in the cluster by updating the ```numWorkers``` field in the ```main``` method.
+
+The output after running the program should look as follows:
+```cmd
+Starting helix manager for cluster [ HelixDemoCluster ], on ZK server [ localhost:2199 ], with [ 3 ] workers, having [ 1] partitions.
+0 [main] WARN org.apache.helix.manager.zk.ZKHelixAdmin - Root directory exists.Cleaning the root directory:/HelixDemoCluster
+Successfully created helix cluster: HelixDemoCluster, with [ 1 ] partitions.
+Successfully started participant node: HelixDemoParticipant_0, on cluster: HelixDemoCluster
+Successfully started participant node: HelixDemoParticipant_1, on cluster: HelixDemoCluster
+Successfully started participant node: HelixDemoParticipant_2, on cluster: HelixDemoCluster
+548 [pool-1-thread-3] WARN org.apache.helix.healthcheck.ParticipantHealthReportTask - ParticipantHealthReportTimerTask already stopped
+548 [pool-1-thread-2] WARN org.apache.helix.healthcheck.ParticipantHealthReportTask - ParticipantHealthReportTimerTask already stopped
+548 [pool-1-thread-1] WARN org.apache.helix.healthcheck.ParticipantHealthReportTask - ParticipantHealthReportTimerTask already stopped
+631 [pool-1-thread-1] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.messaging.handling.HelixTaskExecutor@fc497fe, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_0/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+631 [pool-1-thread-3] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.messaging.handling.HelixTaskExecutor@7bd93e51, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_2/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+631 [pool-1-thread-2] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.messaging.handling.HelixTaskExecutor@3bcaa92a, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_1/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+Successfully added resources to cluster: HelixDemoCluster
+736 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.messaging.handling.HelixTaskExecutor@3046372e, path: /HelixDemoCluster/CONTROLLER/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/CONFIGS/PARTICIPANT, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_2/CURRENTSTATES/15ca80eae2e02bd, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_1/CURRENTSTATES/15ca80eae2e02be, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_0/CURRENTSTATES/15ca80eae2e02bf, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_0/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_1/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_2/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/LIVEINSTANCES, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/IDEALSTATES, expected types: [CALLBACK, FINALIZE] but was INIT
+737 [Thread-17] WARN org.apache.helix.manager.zk.CallbackHandler - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/CONTROLLER, expected types: [CALLBACK, FINALIZE] but was INIT
+Successfully started the controller node: HelixDemoController, on cluster: HelixDemoCluster
+Successfully started the helix cluster manager (admin), on cluster: HelixDemoCluster
+Submitting Workflow for DagType: TYPE_A
+839 [ZkClient-EventThread-33-localhost:2199] WARN org.apache.helix.ConfigAccessor - No config found at /HelixDemoCluster/CONFIGS/RESOURCE/HelixTask_A
+844 [ZkClient-EventThread-33-localhost:2199] WARN org.apache.helix.ConfigAccessor - No config found at /HelixDemoCluster/CONFIGS/RESOURCE/HelixTask_C
+846 [ZkClient-EventThread-33-localhost:2199] WARN org.apache.helix.ConfigAccessor - No config found at /HelixDemoCluster/CONFIGS/RESOURCE/HelixTask_D
+849 [ZkClient-EventThread-33-localhost:2199] WARN org.apache.helix.ConfigAccessor - No config found at /HelixDemoCluster/CONFIGS/RESOURCE/HelixTask_B
+OnlineOfflineStateModelFactory.onBecomeOnlineFromOffline():HelixDemoParticipant_0 transitioning from OFFLINE to ONLINE for HelixTask_C HelixTask_C_0
+OnlineOfflineStateModelFactory.onBecomeOnlineFromOffline():HelixDemoParticipant_0 transitioning from OFFLINE to ONLINE for HelixTask_A HelixTask_A_0
+OnlineOfflineStateModelFactory.onBecomeOnlineFromOffline():HelixDemoParticipant_0 transitioning from OFFLINE to ONLINE for HelixTask_D HelixTask_D_0
+OnlineOfflineStateModelFactory.onBecomeOnlineFromOffline():HelixDemoParticipant_0 transitioning from OFFLINE to ONLINE for HelixTask_B HelixTask_B_0
+Started workflow for DagType: TYPE_A, in cluster: HelixDemoCluster
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322}{}{}
+HelixTaskA | callbackContext: org.apache.helix.task.TaskCallbackContext@2e4d5c35
+HelixTaskA | Inside run(), sleeping for 2 secs
+HelixTaskA | Inside addToUserStore()
+HelixTaskA | Returning status : COMPLETED.
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=IN_PROGRESS}}{}
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED}}{}
+HelixTaskB | callbackContext: org.apache.helix.task.TaskCallbackContext@351b6cb3
+HelixTaskB | Returning status FAILED, Helix will retry this task. Retry count: 1
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=IN_PROGRESS}}{}
+HelixTaskB | callbackContext: org.apache.helix.task.TaskCallbackContext@40decb7c
+HelixTaskB | Returning status FAILED, Helix will retry this task. Retry count: 2
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=IN_PROGRESS}}{}
+HelixTaskB | callbackContext: org.apache.helix.task.TaskCallbackContext@38a77770
+HelixTaskB | After 2 retries, Inside run(), sleeping for 2 secs
+HelixTaskB | Inside getFromUserStore()
+HelixTaskB | Retrieved from UserStore : Gourav Shenoy
+HelixTaskB | Returning status : COMPLETED.
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=IN_PROGRESS}}{}
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=COMPLETED}}{}
+HelixTaskC | callbackContext: org.apache.helix.task.TaskCallbackContext@36cf68e3
+HelixTaskC | Inside run(), sleeping for 2 secs
+HelixTaskC | Inside getFromUserStore()
+HelixTaskC | Retrieved from UserStore : Gourav Shenoy
+HelixTaskC | Returning status : COMPLETED.
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=COMPLETED, helix_workflow_helix_job_c=IN_PROGRESS}}{}
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=COMPLETED, helix_workflow_helix_job_c=COMPLETED, helix_workflow_helix_job_d=IN_PROGRESS}}{}
+HelixTaskD | callbackContext: org.apache.helix.task.TaskCallbackContext@79cba411
+HelixTaskD | Inside run(), sleeping for 2 secs
+HelixTaskD | Inside getFromUserStore()
+HelixTaskD | Retrieved from UserStore : Gourav Shenoy
+HelixTaskD | Returning status : COMPLETED.
+1890 [Thread-19] WARN org.apache.helix.task.TaskRebalancer - Idealstate for resource helix_workflow_helix_job_a does not exist.
+BLAH WKFLW: WorkflowContext, {FINISH_TIME=1498159502302, START_TIME=1498159501322, STATE=COMPLETED}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=COMPLETED, helix_workflow_helix_job_c=COMPLETED, helix_workflow_helix_job_d=COMPLETED}}{}
+Successfully completed workflow for Dag: TYPE_A
+*** Exiting System ***
+1900 [Thread-19] WARN org.apache.helix.task.TaskRebalancer - Idealstate for resource helix_workflow_helix_job_b does not exist.
+1910 [Thread-19] WARN org.apache.helix.task.TaskRebalancer - Idealstate for resource helix_workflow_helix_job_c does not exist.
+1931 [Thread-19] ERROR org.apache.helix.task.JobRebalancer - Job configuration is NULL for helix_workflow_helix_job_d
+1939 [Thread-19] WARN org.apache.helix.model.IdealState - Resource key:helix_workflow_helix_job_d_0 does not have a pre-computed preference list.
+HelixManager Admin disconnecting from cluster: HelixDemoCluster
+6211 [Thread-6] WARN org.apache.helix.participant.statemachine.StateModel - Default reset method invoked. Either because the process longer own this resource or session timedout
+6212 [Thread-6] WARN org.apache.helix.participant.statemachine.StateModel - Default reset method invoked. Either because the process longer own this resource or session timedout
+6212 [Thread-6] WARN org.apache.helix.participant.statemachine.StateModel - Default reset method invoked. Either because the process longer own this resource or session timedout
+6212 [Thread-6] WARN org.apache.helix.participant.statemachine.StateModel - Default reset method invoked. Either because the process longer own this resource or session timedout
+6214 [Thread-6] WARN org.apache.helix.participant.statemachine.StateModel - Default reset method invoked. Either because the process longer own this resource or session timedout
+6214 [Thread-6] WARN org.apache.helix.participant.statemachine.StateModel - Default reset method invoked. Either because the process longer own this resource or session timedout
+6214 [Thread-6] WARN org.apache.helix.participant.statemachine.StateModel - Default reset method invoked. Either because the process longer own this resource or session timedout
+6214 [Thread-6] WARN org.apache.helix.participant.statemachine.StateModel - Default reset method invoked. Either because the process longer own this resource or session timedout
+```
diff --git a/helix-playground/images/helix-task-framework.png b/helix-playground/images/helix-task-framework.png
new file mode 100644
index 0000000..4ee24a8
--- /dev/null
+++ b/helix-playground/images/helix-task-framework.png
Binary files differ
diff --git a/helix-playground/pom.xml b/helix-playground/pom.xml
new file mode 100644
index 0000000..cae2614
--- /dev/null
+++ b/helix-playground/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--<parent>-->
+ <!--<artifactId>helix</artifactId>-->
+ <!--<groupId>org.apache.helix</groupId>-->
+ <!--<version>0.6.7</version>-->
+ <!--</parent>-->
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>helix-playground</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.5</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>6.0.1</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ControllerNode.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ControllerNode.java
new file mode 100644
index 0000000..ccb3fb2
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/ControllerNode.java
@@ -0,0 +1,67 @@
+package edu.iu.helix.airavata;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class ControllerNode implements Runnable {
+
+ private static final Logger logger = LogManager.getLogger(ControllerNode.class);
+
+ private String clusterName;
+ private String controllerName;
+ private String zkAddress;
+ private HelixManager zkHelixManager;
+
+ private CountDownLatch startLatch = new CountDownLatch(1);
+ private CountDownLatch stopLatch = new CountDownLatch(1);
+
+ public ControllerNode(String zkAddress, String clusterName, String controllerName) {
+ this.clusterName = clusterName;
+ this.controllerName = controllerName;
+ this.zkAddress = zkAddress;
+ }
+
+ @Override
+ public void run() {
+ try {
+ zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName,
+ controllerName, HelixControllerMain.STANDALONE);
+ startLatch.countDown();
+ stopLatch.await();
+ } catch (Exception ex) {
+ logger.error("Error in run() for Controller: " + controllerName + ", reason: " + ex, ex);
+ } finally {
+ disconnect();
+ }
+
+ }
+
+ public void start() {
+ new Thread(this).start();
+ try {
+ startLatch.await();
+ logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName);
+ } catch (InterruptedException ex) {
+ logger.error("Controller: " + controllerName + ", is interrupted! reason: " + ex, ex);
+ }
+
+ }
+
+ public void stop() {
+ stopLatch.countDown();
+ }
+
+ private void disconnect() {
+ if (zkHelixManager != null) {
+ logger.info("Controller: " + controllerName + ", has disconnected from cluster: " + clusterName);
+ zkHelixManager.disconnect();
+ }
+ }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixCluster.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixCluster.java
new file mode 100644
index 0000000..d59737d
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixCluster.java
@@ -0,0 +1,66 @@
+package edu.iu.helix.airavata;
+
+import edu.iu.helix.airavata.tasks.HelixTaskA;
+import edu.iu.helix.airavata.tasks.HelixTaskB;
+import edu.iu.helix.airavata.tasks.HelixTaskC;
+import edu.iu.helix.airavata.tasks.HelixTaskD;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixCluster {
+
+ private static final Logger logger = LogManager.getLogger(HelixCluster.class);
+
+ private String zkAddress;
+ private String clusterName;
+ private int numPartitions;
+
+ private ZkClient zkClient;
+ private ZKHelixAdmin zkHelixAdmin;
+
+ public HelixCluster(String zkAddress, String clusterName, int numPartitions) {
+ this.zkAddress = zkAddress;
+ this.clusterName = clusterName;
+ this.numPartitions = numPartitions;
+
+ zkClient = new ZkClient(this.zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ zkHelixAdmin = new ZKHelixAdmin(zkClient);
+ }
+
+ public void setup() {
+ zkHelixAdmin.addCluster(clusterName, true);
+ zkHelixAdmin.addStateModelDef(clusterName, OnlineOfflineSMD.name, OnlineOfflineSMD.build());
+ logger.info("Cluster: " + clusterName + ", has been added.");
+ }
+
+ public void addResourcesToCluster() {
+ String stateModel = BuiltInStateModelDefinitions.OnlineOffline.name();
+ zkHelixAdmin.addResource(clusterName, HelixTaskA.TASK_COMMAND, numPartitions, stateModel);
+ zkHelixAdmin.addResource(clusterName, HelixTaskB.TASK_COMMAND, numPartitions, stateModel);
+ zkHelixAdmin.addResource(clusterName, HelixTaskC.TASK_COMMAND, numPartitions, stateModel);
+ zkHelixAdmin.addResource(clusterName, HelixTaskD.TASK_COMMAND, numPartitions, stateModel);
+ logger.debug("Resources (A,B,C,D) with [ " + numPartitions + " ] partitions have been added to Cluster: " + clusterName);
+
+ zkHelixAdmin.rebalance(clusterName, HelixTaskA.TASK_COMMAND, 1);
+ zkHelixAdmin.rebalance(clusterName, HelixTaskB.TASK_COMMAND, 1);
+ zkHelixAdmin.rebalance(clusterName, HelixTaskC.TASK_COMMAND, 1);
+ zkHelixAdmin.rebalance(clusterName, HelixTaskD.TASK_COMMAND, 1);
+ logger.debug("Resources (A,B,C,D) have been rebalanced");
+ }
+
+ public void disconnect() {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
new file mode 100644
index 0000000..313a674
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
@@ -0,0 +1,158 @@
+package edu.iu.helix.airavata;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.print.attribute.standard.JobState;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixClusterManager {
+
+ private static final Logger logger = LogManager.getLogger(HelixClusterManager.class);
+
+ private int numWorkers;
+ private int numPartitions;
+ private String clusterName;
+ private String zkAddress;
+ private HelixManager helixManager;
+ private TaskDriver taskDriver;
+
+ private ControllerNode controllerNode;
+ private HelixCluster helixCluster;
+ private static final String CONTROLLER_NAME = "HelixDemoController";
+ private static final String PARTICIPANT_NAME = "HelixDemoParticipant_";
+
+ public HelixClusterManager(String clusterName, String zkAddress, int numWorkers, int numPartitions) {
+ this.numWorkers = numWorkers;
+ this.numPartitions = numPartitions;
+ this.clusterName = clusterName;
+ this.zkAddress = zkAddress;
+ }
+
+ public void startHelixCluster() {
+ // create the cluster
+ helixCluster = new HelixCluster(zkAddress, clusterName, numPartitions);
+ helixCluster.setup();
+ System.out.println("Successfully created helix cluster: " + clusterName + ", with [ " + numPartitions + " ] partitions.");
+
+ // start the participants
+ Executor executor = Executors.newFixedThreadPool(numWorkers);
+ for (int i = 0; i < numWorkers; i++) {
+ String participantName = PARTICIPANT_NAME + i;
+ ParticipantNode participant = new ParticipantNode(zkAddress, clusterName, participantName);
+ executor.execute(participant);
+ System.out.println("Successfully started participant node: " + participantName + ", on cluster: " + clusterName);
+ }
+
+ // add resources to cluster
+ helixCluster.addResourcesToCluster();
+ System.out.println("Successfully added resources to cluster: " + clusterName);
+
+ // start the controller
+ controllerNode = new ControllerNode(zkAddress, clusterName, CONTROLLER_NAME);
+ controllerNode.start();
+ System.out.println("Successfully started the controller node: " + CONTROLLER_NAME + ", on cluster: " + clusterName);
+
+ // start the cluster manager
+ connect();
+ System.out.println("Successfully started the helix cluster manager (admin), on cluster: " + clusterName);
+ }
+
+ private void connect() {
+
+ this.helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "Admin",
+ InstanceType.SPECTATOR, zkAddress);
+ try {
+ this.helixManager.connect();
+ } catch (Exception ex) {
+ logger.error("Error in connect() for Admin, reason: " + ex, ex);
+ }
+
+ this.taskDriver = new TaskDriver(helixManager);
+ logger.debug("HelixManager Admin connected.");
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ disconnect();
+ controllerNode.stop();
+ helixCluster.disconnect();
+ }
+ }
+ );
+ }
+
+ private void disconnect() {
+ if (helixManager != null) {
+ System.out.println("HelixManager Admin disconnecting from cluster: " + clusterName);
+ helixManager.disconnect();
+ }
+ }
+
+ public boolean submitDag(HelixUtil.DAGType dagType) {
+ Workflow workflow = HelixUtil.getWorkflow(dagType);
+ taskDriver.start(workflow);
+ System.out.println("Started workflow for DagType: " + dagType + ", in cluster: " + clusterName);
+ try {
+ taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED);
+// while (true) {
+// Thread.sleep(100);
+// WorkflowContext context = taskDriver.getWorkflowContext(workflow.getName());
+// System.out.println(context);
+// if (context != null && context.getWorkflowState() != null) {
+// if (context.getWorkflowState().equals(TaskState.COMPLETED)) {
+// System.out.println("Workflow completed!");
+// break;
+// } else if (context.getWorkflowState().equals(TaskState.FAILED)) {
+// System.err.println("Workflow failed!");
+// break;
+// }
+// }
+// }
+ return true;
+ } catch (Exception ex) {
+ logger.error("Error submitting Dag for type: " + dagType + ", reason: " + ex, ex);
+ return false;
+ }
+ }
+
+ public static void main(String[] args) {
+ String clusterName = "HelixDemoCluster";
+ String zkAddress = "localhost:2199";
+ int numWorkers = 3;
+ int numPartitions = 1;
+
+ try {
+ System.out.println("Starting helix manager for cluster [ " + clusterName + " ], " +
+ "on ZK server [ " + zkAddress + " ], " +
+ "with [ " + numWorkers + " ] workers, " +
+ "having [ " + numPartitions + "] partitions.");
+ HelixClusterManager manager = new HelixClusterManager(clusterName, zkAddress, numWorkers, numPartitions);
+ manager.startHelixCluster();
+
+ System.out.println("Submitting Workflow for DagType: " + HelixUtil.DAGType.TYPE_A);
+ if (manager.submitDag(HelixUtil.DAGType.TYPE_A)) {
+ System.out.println("Successfully completed workflow for Dag: " + HelixUtil.DAGType.TYPE_A);
+ } else {
+ throw new Exception("Failed to run workflow for Dag: " + HelixUtil.DAGType.TYPE_A);
+ }
+ } catch (Exception ex) {
+ logger.error("Something went wrong while running helix cluster manager. Reason: " + ex, ex);
+ }
+
+ System.out.println("*** Exiting System ***");
+// System.exit(0);
+ }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
new file mode 100644
index 0000000..2cea68f
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
@@ -0,0 +1,82 @@
+package edu.iu.helix.airavata;
+
+import edu.iu.helix.airavata.tasks.HelixTaskA;
+import edu.iu.helix.airavata.tasks.HelixTaskB;
+import edu.iu.helix.airavata.tasks.HelixTaskC;
+import edu.iu.helix.airavata.tasks.HelixTaskD;
+import org.apache.helix.task.*;
+import org.jboss.netty.util.internal.ThreadLocalRandom;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixUtil {
+
+ public static final String TASK_STATE_DEF = "Task";
+
+ public enum DAGType {
+ TYPE_A,
+ TYPE_B,
+ TYPE_C
+ }
+
+ private static Workflow.Builder getWorkflowBuilder(DAGType dagType) {
+ // create task configs
+ List<TaskConfig> taskConfig1 = new ArrayList<TaskConfig>();
+ List<TaskConfig> taskConfig2 = new ArrayList<TaskConfig>();
+ List<TaskConfig> taskConfig3 = new ArrayList<TaskConfig>();
+ List<TaskConfig> taskConfig4 = new ArrayList<TaskConfig>();
+ taskConfig1.add(new TaskConfig.Builder().setTaskId("helix_task_a").setCommand(HelixTaskA.TASK_COMMAND).build());
+ taskConfig2.add(new TaskConfig.Builder().setTaskId("helix_task_b").setCommand(HelixTaskB.TASK_COMMAND).build());
+ taskConfig3.add(new TaskConfig.Builder().setTaskId("helix_task_c").setCommand(HelixTaskC.TASK_COMMAND).build());
+ taskConfig4.add(new TaskConfig.Builder().setTaskId("helix_task_d").setCommand(HelixTaskD.TASK_COMMAND).build());
+
+ // create job configs
+ JobConfig.Builder jobConfig1 = new JobConfig.Builder().addTaskConfigs(taskConfig1).setMaxAttemptsPerTask(3);
+ JobConfig.Builder jobConfig2 = new JobConfig.Builder().addTaskConfigs(taskConfig2).setMaxAttemptsPerTask(3);
+ JobConfig.Builder jobConfig3 = new JobConfig.Builder().addTaskConfigs(taskConfig3).setMaxAttemptsPerTask(3);
+ JobConfig.Builder jobConfig4 = new JobConfig.Builder().addTaskConfigs(taskConfig4).setMaxAttemptsPerTask(3);
+
+ // create workflow
+ Workflow.Builder workflowBuilder = new Workflow.Builder("helix_workflow").setExpiry(0);
+ workflowBuilder.addJob("helix_job_a", jobConfig1);
+ workflowBuilder.addJob("helix_job_b", jobConfig2);
+ workflowBuilder.addJob("helix_job_c", jobConfig3);
+ workflowBuilder.addJob("helix_job_d", jobConfig4);
+
+
+ switch (dagType) {
+ case TYPE_A:
+ workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_b");
+ workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c");
+ workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d");
+ break;
+
+ case TYPE_B:
+ workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_c");
+ workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d");
+ workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b");
+ break;
+
+ case TYPE_C:
+ workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_d");
+ workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b");
+ workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c");
+ break;
+ }
+
+ return workflowBuilder;
+ }
+
+ public static Workflow getWorkflow(DAGType dagType) {
+ Workflow.Builder workflowBuilder = getWorkflowBuilder(dagType);
+ return workflowBuilder.build();
+ }
+
+ private static String generateWorkflowName() {
+ return "workflow_" + ThreadLocalRandom.current().nextInt(9999);
+ }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
new file mode 100644
index 0000000..54466f2
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
@@ -0,0 +1,145 @@
+package edu.iu.helix.airavata;
+
+import edu.iu.helix.airavata.tasks.HelixTaskA;
+import edu.iu.helix.airavata.tasks.HelixTaskB;
+import edu.iu.helix.airavata.tasks.HelixTaskC;
+import edu.iu.helix.airavata.tasks.HelixTaskD;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class ParticipantNode implements Runnable {
+
+ private static final Logger logger = LogManager.getLogger(ParticipantNode.class);
+
+ private String zkAddress;
+ private String clusterName;
+ private String participantName;
+ private ZKHelixManager zkHelixManager;
+
+ public ParticipantNode(String zkAddress, String clusterName, String participantName) {
+ logger.debug("Initializing Participant Node");
+ this.zkAddress = zkAddress;
+ this.clusterName = clusterName;
+ this.participantName = participantName;
+ }
+
+ @Override
+ public void run() {
+ ZkClient zkClient = null;
+ try {
+ zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+ List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName);
+ if (!nodesInCluster.contains(participantName)) {
+ InstanceConfig instanceConfig = new InstanceConfig(participantName);
+ instanceConfig.setHostName("localhost");
+ instanceConfig.setInstanceEnabled(true);
+ zkHelixAdmin.addInstance(clusterName, instanceConfig);
+ logger.debug("Instance: " + participantName + ", has been added to cluster: " + clusterName);
+ }
+
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ logger.debug("Participant: " + participantName + ", shutdown hook called.");
+ disconnect();
+ }
+ }
+ );
+
+ // connect the participant manager
+ connect();
+ } catch (Exception ex) {
+ logger.error("Error in run() for Participant: " + participantName + ", reason: " + ex, ex);
+ } finally {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+
+ }
+
+ private void connect() {
+ try {
+ zkHelixManager = new ZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, zkAddress);
+
+ // register online-offline model
+ StateMachineEngine machineEngine = zkHelixManager.getStateMachineEngine();
+ OnlineOfflineStateModelFactory factory = new OnlineOfflineStateModelFactory(participantName);
+ machineEngine.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), factory);
+
+ // register task model
+ Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+ taskRegistry.put(HelixTaskA.TASK_COMMAND, new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new HelixTaskA(context);
+ }
+ });
+ taskRegistry.put(HelixTaskB.TASK_COMMAND, new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new HelixTaskB(context);
+ }
+ });
+ taskRegistry.put(HelixTaskC.TASK_COMMAND, new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new HelixTaskC(context);
+ }
+ });
+ taskRegistry.put(HelixTaskD.TASK_COMMAND, new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new HelixTaskD(context);
+ }
+ });
+
+ machineEngine.registerStateModelFactory(HelixUtil.TASK_STATE_DEF,
+ new TaskStateModelFactory(zkHelixManager, taskRegistry));
+ logger.debug("Participant: " + participantName + ", registered state model factories.");
+
+ zkHelixManager.connect();
+ logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName);
+
+ Thread.currentThread().join();
+ } catch (InterruptedException ex) {
+ logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+ }
+ catch (Exception ex) {
+ logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
+ } finally {
+ disconnect();
+ }
+ }
+
+ private void disconnect() {
+ if (zkHelixManager != null) {
+ logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName);
+ zkHelixManager.disconnect();
+ }
+ }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskA.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskA.java
new file mode 100644
index 0000000..cdae18e
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskA.java
@@ -0,0 +1,49 @@
+package edu.iu.helix.airavata.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixTaskA extends UserContentStore implements Task {
+
+ private static Logger logger = LogManager.getLogger(HelixTaskA.class);
+ public static final String TASK_COMMAND = "HelixTask_A";
+ private static int count = 0;
+
+ public HelixTaskA(TaskCallbackContext callbackContext) {
+ System.out.println("HelixTaskA | callbackContext: " + callbackContext);
+ }
+
+ @Override
+ public TaskResult run() {
+ System.out.println("HelixTaskA | Inside run(), sleeping for 2 secs");
+ addToUserStore();
+// sleep(2000);
+ System.out.println("HelixTaskA | Returning status : COMPLETED.");
+ return new TaskResult(TaskResult.Status.COMPLETED, "HelixTask completed!");
+ }
+
+ @Override
+ public void cancel() {
+ System.out.println("HelixTaskA | Inside cancel()");
+ }
+
+ private void addToUserStore() {
+ System.out.println("HelixTaskA | Inside addToUserStore()");
+ putUserContent("fullName", "Gourav Shenoy", Scope.WORKFLOW);
+ }
+
+ private static void sleep(long d) {
+ try {
+ Thread.sleep(d);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskB.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskB.java
new file mode 100644
index 0000000..9af9c99
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskB.java
@@ -0,0 +1,59 @@
+package edu.iu.helix.airavata.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixTaskB extends UserContentStore implements Task {
+
+ private static Logger logger = LogManager.getLogger(HelixTaskB.class);
+ public static final String TASK_COMMAND = "HelixTask_B";
+
+ private static int retryCount = 0;
+ public HelixTaskB(TaskCallbackContext callbackContext) {
+ System.out.println("HelixTaskB | callbackContext: " + callbackContext);
+ }
+
+ @Override
+ public TaskResult run() {
+ if (retryCount < 2) {
+ retryCount++;
+ System.out.println("HelixTaskB | Returning status FAILED, Helix will retry this task. Retry count: " + retryCount);
+ return new TaskResult(TaskResult.Status.FAILED, "HelixTaskB should be retried!");
+ }
+
+ System.out.println("HelixTaskB | After 2 retries, Inside run(), sleeping for 2 secs");
+// sleep(2000);
+ System.out.println("HelixTaskB | Retrieved from UserStore : " + getFromUserStore("fullName"));
+
+// System.out.println("HelixTaskB | Returning status : FATAL_FAILED.");
+// return new TaskResult(TaskResult.Status.FATAL_FAILED, "edu.iu.helix.play.HelixTaskB completed!");
+
+ System.out.println("HelixTaskB | Returning status : COMPLETED.");
+ return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskB completed!");
+ }
+
+ @Override
+ public void cancel() {
+ System.out.println("HelixTaskB | Inside cancel()");
+ }
+
+ private String getFromUserStore(String key) {
+ System.out.println("HelixTaskB | Inside getFromUserStore()");
+ return getUserContent(key, Scope.WORKFLOW);
+ }
+
+ private static void sleep(long d) {
+ try {
+ Thread.sleep(d);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskC.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskC.java
new file mode 100644
index 0000000..2132001
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskC.java
@@ -0,0 +1,48 @@
+package edu.iu.helix.airavata.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixTaskC extends UserContentStore implements Task {
+
+ private static Logger logger = LogManager.getLogger(HelixTaskC.class);
+ public static final String TASK_COMMAND = "HelixTask_C";
+
+ public HelixTaskC(TaskCallbackContext callbackContext) {
+ System.out.println("HelixTaskC | callbackContext: " + callbackContext);
+ }
+
+ @Override
+ public TaskResult run() {
+ System.out.println("HelixTaskC | Inside run(), sleeping for 2 secs");
+// sleep(2000);
+ System.out.println("HelixTaskC | Retrieved from UserStore : " + getFromUserStore("fullName"));
+ System.out.println("HelixTaskC | Returning status : COMPLETED.");
+ return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskC completed!");
+ }
+
+ @Override
+ public void cancel() {
+ System.out.println("HelixTaskC | Inside cancel()");
+ }
+
+ private String getFromUserStore(String key) {
+ System.out.println("HelixTaskC | Inside getFromUserStore()");
+ return getUserContent(key, Scope.WORKFLOW);
+ }
+
+ private static void sleep(long d) {
+ try {
+ Thread.sleep(d);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskD.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskD.java
new file mode 100644
index 0000000..52488e8
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskD.java
@@ -0,0 +1,48 @@
+package edu.iu.helix.airavata.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixTaskD extends UserContentStore implements Task {
+
+ private static Logger logger = LogManager.getLogger(HelixTaskD.class);
+ public static final String TASK_COMMAND = "HelixTask_D";
+
+ public HelixTaskD(TaskCallbackContext callbackContext) {
+ System.out.println("HelixTaskD | callbackContext: " + callbackContext);
+ }
+
+ @Override
+ public TaskResult run() {
+ System.out.println("HelixTaskD | Inside run(), sleeping for 2 secs");
+// sleep(2000);
+ System.out.println("HelixTaskD | Retrieved from UserStore : " + getFromUserStore("fullName"));
+ System.out.println("HelixTaskD | Returning status : COMPLETED.");
+ return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskD completed!");
+ }
+
+ @Override
+ public void cancel() {
+ System.out.println("HelixTaskD | Inside cancel()");
+ }
+
+ private String getFromUserStore(String key) {
+ System.out.println("HelixTaskD | Inside getFromUserStore()");
+ return getUserContent(key, Scope.WORKFLOW);
+ }
+
+ private static void sleep(long d) {
+ try {
+ Thread.sleep(d);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/play/HelixDAGFramework.java b/helix-playground/src/main/java/edu/iu/helix/play/HelixDAGFramework.java
new file mode 100644
index 0000000..aee4e06
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/play/HelixDAGFramework.java
@@ -0,0 +1,221 @@
+package edu.iu.helix.play;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.*;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by goshenoy on 6/18/17.
+ */
+public class HelixDAGFramework {
+
+ private static Logger logger = LogManager.getLogger(HelixDAGFramework.class);
+
+ private static final String ZK_ADDRESS = "localhost:2199";
+ private static final String PARTICIPANT_ADDRESS = "localhost:12918";
+
+ private static final String CLUSTER_NAME = "HelixDagCluster";
+ private static final String RESOURCE_NAME = "HelixResource";
+ private static final String CONTROLLER_NAME = "HelixController";
+ private static final String ADMIN_NAME = "HelixAdmin";
+
+ private static final String WORKFLOW_NAME = "helix_workflow";
+ private static final String JOB_NAME = "helix_job";
+ private static final String TASK_A_NAME = "helix_task_id_a";
+ private static final String TASK_B_NAME = "helix_task_id_b";
+
+ private static final String ONLINE_OFFLINE = "OnlineOffline";
+
+ private static HelixManager adminManager;
+ private static ZKHelixManager participantManager;
+ private static ZKHelixManager controllerManager;
+
+ private static ClusterSetup _setupTool;
+
+
+ public static void main(String[] args) {
+ try {
+ // create cluster
+ createCluster();
+
+ // add instance, resource to cluster
+ addInstanceToCluster();
+ addResourceToCluster();
+
+ // create participant, controller & admin manager
+ createParticipantManager();
+ createControllerManager();
+ createAdminManager();
+
+ // verifying cluster state
+ verifyClusterState();
+
+ // create task-driver
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Creating TaskDriver.");
+ TaskDriver taskDriver = new TaskDriver(adminManager);
+
+ // create task-config list
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Creating TaskConfig list.");
+ List<TaskConfig> taskConfigList1 = new ArrayList<TaskConfig>();
+ List<TaskConfig> taskConfigList2 = new ArrayList<TaskConfig>();
+ taskConfigList1.add(
+ new TaskConfig.Builder().setTaskId(TASK_A_NAME).setCommand(HelixTaskA.TASK_COMMAND).build()
+ );
+ taskConfigList2.add(
+ new TaskConfig.Builder().setTaskId(TASK_B_NAME).setCommand(HelixTaskB.TASK_COMMAND).build()
+ );
+
+ // create job-config-builder
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Creating JobConfig.Builder.");
+ JobConfig.Builder jobConfigBuilder1 = new JobConfig.Builder().addTaskConfigs(taskConfigList1);
+ JobConfig.Builder jobConfigBuilder2 = new JobConfig.Builder().addTaskConfigs(taskConfigList2);
+
+ // create workflow-builder & add job
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Creating Workflow.Builder.");
+ Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW_NAME).setExpiry(0);
+
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Adding Jobs (a,b) to Workflow.Builder.");
+ workflowBuilder.addJob(JOB_NAME + "_a", jobConfigBuilder1);
+ workflowBuilder.addJob(JOB_NAME + "_b", jobConfigBuilder2);
+
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Setting Job A parent of Job B.");
+ workflowBuilder.addParentChildDependency(JOB_NAME + "_a", JOB_NAME + "_b");
+
+ // start the workflow
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Starting the Workflow.");
+ taskDriver.start(workflowBuilder.build());
+
+ // waiting for job to complete
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Waiting for Workflow to COMPLETE.");
+// taskDriver.pollForJobState(WORKFLOW_NAME, WORKFLOW_NAME + "_" + JOB_NAME, TaskState.COMPLETED);
+ taskDriver.pollForWorkflowState(WORKFLOW_NAME, TaskState.COMPLETED);
+
+ // job completed, exit
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Job Completed, Exiting.");
+ } catch (Exception ex) {
+ logger.error("Exception caught | ex: " + ex.getMessage(), ex);
+ } finally {
+ // disconnect all managers
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting All Managers (Participant, Controller, Admin).");
+ disconnectManagers();
+
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Bye!");
+ }
+ }
+
+ private static void createCluster() {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a cluster.");
+ _setupTool = new ClusterSetup(ZK_ADDRESS);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ }
+
+ private static void addInstanceToCluster() {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Adding instanace to cluster.");
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, PARTICIPANT_ADDRESS);
+ }
+
+ private static void addResourceToCluster() {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Adding resource to cluster.");
+ _setupTool.addResourceToCluster(CLUSTER_NAME, RESOURCE_NAME, 1, ONLINE_OFFLINE);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, RESOURCE_NAME, 1);
+ }
+
+ private static void createParticipantManager() {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a Participant Manager.");
+ String instanceName = PARTICIPANT_ADDRESS.replaceAll(":", "_");
+ participantManager = new ZKHelixManager(CLUSTER_NAME, instanceName, InstanceType.PARTICIPANT, ZK_ADDRESS);
+
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Registering SMF for Participant.");
+ StateMachineEngine sme = participantManager.getStateMachineEngine();
+ sme.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), new OnlineOfflineStateModelFactory());
+
+ try {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Registering Task for Participant.");
+ registerTaskAndStateModel();
+
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Starting Participant Manager.");
+ participantManager.connect();
+ } catch (Exception ex) {
+ logger.error("Error creating Participant Manager, ex: " + ex, ex);
+ }
+ }
+
+ private static void createAdminManager() {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a Admin Manager.");
+ adminManager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, ADMIN_NAME, InstanceType.ADMINISTRATOR, ZK_ADDRESS);
+ try {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Starting Admin Manager.");
+ adminManager.connect();
+ } catch (Exception ex) {
+ logger.error("Error creating Admin Manager, ex: " + ex, ex);
+ }
+ }
+
+ private static void createControllerManager() {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a Controller Manager.");
+ controllerManager = new ZKHelixManager(CLUSTER_NAME, CONTROLLER_NAME, InstanceType.CONTROLLER, ZK_ADDRESS);
+ try {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Starting Controller Manager.");
+ controllerManager.connect();
+ } catch (Exception ex) {
+ logger.error("Error creating Controller Manager, ex: " + ex, ex);
+ }
+ }
+
+ private static void registerTaskAndStateModel() {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Registering Task.");
+ Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+ taskRegistry.put(HelixTaskA.TASK_COMMAND, new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new HelixTaskA(context);
+ }
+ });
+
+ taskRegistry.put(HelixTaskB.TASK_COMMAND, new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new HelixTaskB(context);
+ }
+ });
+
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Registering Task StateModel Factory.");
+ StateMachineEngine sme = participantManager.getStateMachineEngine();
+ sme.registerStateModelFactory("Task", new TaskStateModelFactory(participantManager, taskRegistry));
+ }
+
+ private static void verifyClusterState() {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Verifying Cluster State.");
+ boolean result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDRESS, CLUSTER_NAME));
+ Assert.assertTrue(result);
+ }
+
+ private static void disconnectManagers() {
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting Admin Manager.");
+ adminManager.disconnect();
+
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting Participant Manager.");
+ participantManager.disconnect();
+
+ logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting Contoller Manager.");
+ controllerManager.disconnect();
+
+ System.exit(0);
+ }
+ }
diff --git a/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskA.java b/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskA.java
new file mode 100644
index 0000000..7859624
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskA.java
@@ -0,0 +1,48 @@
+package edu.iu.helix.play;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/19/17.
+ */
+public class HelixTaskA extends UserContentStore implements Task {
+
+ private static Logger logger = LogManager.getLogger(HelixTaskA.class);
+ public static final String TASK_COMMAND = "HelixTask-A";
+
+ HelixTaskA(TaskCallbackContext callbackContext) {
+ logger.info("edu.iu.helix.play.HelixTaskA | callbackContext: " + callbackContext);
+ }
+
+ @Override
+ public TaskResult run() {
+ logger.info("edu.iu.helix.play.HelixTaskA | Inside run(), sleeping for 5 secs");
+ addToUserStore();
+ sleep(5000);
+ logger.info("edu.iu.helix.play.HelixTaskA | Returning status : COMPLETED.");
+ return new TaskResult(TaskResult.Status.COMPLETED, "HelixTask completed!");
+ }
+
+ @Override
+ public void cancel() {
+ logger.info("edu.iu.helix.play.HelixTaskA | Inside cancel()");
+ }
+
+ private void addToUserStore() {
+ logger.info("edu.iu.helix.play.HelixTaskA | Inside addToUserStore()");
+ putUserContent("fullName", "Gourav Shenoy", Scope.WORKFLOW);
+ }
+
+ private static void sleep(long d) {
+ try {
+ Thread.sleep(d);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskB.java b/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskB.java
new file mode 100644
index 0000000..b698286
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskB.java
@@ -0,0 +1,52 @@
+package edu.iu.helix.play;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/20/17.
+ */
+public class HelixTaskB extends UserContentStore implements Task {
+
+ private static Logger logger = LogManager.getLogger(HelixTaskA.class);
+ public static final String TASK_COMMAND = "HelixTask-B";
+
+ HelixTaskB(TaskCallbackContext callbackContext) {
+ logger.info("edu.iu.helix.play.HelixTaskB | callbackContext: " + callbackContext);
+ }
+
+ @Override
+ public TaskResult run() {
+ logger.info("edu.iu.helix.play.HelixTaskB | Inside run(), sleeping for 5 secs");
+ long expiry = System.currentTimeMillis() + 3000L;
+ while (System.currentTimeMillis() < expiry) {
+ logger.info("edu.iu.helix.play.HelixTaskB | Inside run(), *** Waiting ***");
+// sleep(50);
+ }
+ logger.info("edu.iu.helix.play.HelixTaskB | Retrieved from UserStore : " + getFromUserStore("fullName"));
+ logger.info("edu.iu.helix.play.HelixTaskB | Returning status : COMPLETED.");
+ return new TaskResult(TaskResult.Status.COMPLETED, "edu.iu.helix.play.HelixTaskB completed!");
+ }
+
+ @Override
+ public void cancel() {
+ logger.info("edu.iu.helix.play.HelixTaskB | Inside cancel()");
+ }
+
+ private String getFromUserStore(String key) {
+ logger.info("edu.iu.helix.play.HelixTaskB | Inside getFromUserStore()");
+ return getUserContent(key, Scope.WORKFLOW);
+ }
+
+ private static void sleep(long d) {
+ try {
+ Thread.sleep(d);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/helix-playground/src/main/resources/log4j.properties b/helix-playground/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6193d62
--- /dev/null
+++ b/helix-playground/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=WARN, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file