blob: d59737d663199060c58100190fc99f0f2266ef11 [file] [log] [blame]
package edu.iu.helix.airavata;
import edu.iu.helix.airavata.tasks.HelixTaskA;
import edu.iu.helix.airavata.tasks.HelixTaskB;
import edu.iu.helix.airavata.tasks.HelixTaskC;
import edu.iu.helix.airavata.tasks.HelixTaskD;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.OnlineOfflineSMD;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* Created by goshenoy on 6/21/17.
*/
public class HelixCluster {
private static final Logger logger = LogManager.getLogger(HelixCluster.class);
private String zkAddress;
private String clusterName;
private int numPartitions;
private ZkClient zkClient;
private ZKHelixAdmin zkHelixAdmin;
public HelixCluster(String zkAddress, String clusterName, int numPartitions) {
this.zkAddress = zkAddress;
this.clusterName = clusterName;
this.numPartitions = numPartitions;
zkClient = new ZkClient(this.zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
zkHelixAdmin = new ZKHelixAdmin(zkClient);
}
public void setup() {
zkHelixAdmin.addCluster(clusterName, true);
zkHelixAdmin.addStateModelDef(clusterName, OnlineOfflineSMD.name, OnlineOfflineSMD.build());
logger.info("Cluster: " + clusterName + ", has been added.");
}
public void addResourcesToCluster() {
String stateModel = BuiltInStateModelDefinitions.OnlineOffline.name();
zkHelixAdmin.addResource(clusterName, HelixTaskA.TASK_COMMAND, numPartitions, stateModel);
zkHelixAdmin.addResource(clusterName, HelixTaskB.TASK_COMMAND, numPartitions, stateModel);
zkHelixAdmin.addResource(clusterName, HelixTaskC.TASK_COMMAND, numPartitions, stateModel);
zkHelixAdmin.addResource(clusterName, HelixTaskD.TASK_COMMAND, numPartitions, stateModel);
logger.debug("Resources (A,B,C,D) with [ " + numPartitions + " ] partitions have been added to Cluster: " + clusterName);
zkHelixAdmin.rebalance(clusterName, HelixTaskA.TASK_COMMAND, 1);
zkHelixAdmin.rebalance(clusterName, HelixTaskB.TASK_COMMAND, 1);
zkHelixAdmin.rebalance(clusterName, HelixTaskC.TASK_COMMAND, 1);
zkHelixAdmin.rebalance(clusterName, HelixTaskD.TASK_COMMAND, 1);
logger.debug("Resources (A,B,C,D) have been rebalanced");
}
public void disconnect() {
if (zkClient != null) {
zkClient.close();
}
}
}