In this tutorial, we will cover the roles of a Helix-managed cluster, and show the code you need to write to integrate with it. In many cases, there is a simple default behavior that is often appropriate, but you can also customize the behavior.
Convention: we first cover the basic approach, which is the easiest to implement. Then, we'll describe advanced options, which give you more control over the system behavior, but require you to write more code.
First, we need to set up the system. Let's walk through the steps in building a distributed system using Helix. We will show how to do this using both the Java admin interface, as well as the cluster accessor interface. You can choose either interface depending on which most closely matches your needs.
This starts a zookeeper in standalone mode. For production deployment, see Apache ZooKeeper for instructions.
./start-standalone-zookeeper.sh 2199 &
Creating a cluster will define the cluster in appropriate ZNodes on ZooKeeper.
Using the Java accessor API:
// Note: ZK_ADDRESS is the host:port of Zookeeper String ZK_ADDRESS = "localhost:2199"; HelixConnection connection = new ZKHelixConnection(ZK_ADDRESS); ClusterId clusterId = ClusterId.from("helix-demo"); ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId); ClusterConfig clusterConfig = new ClusterConfig.Builder(clusterId).build(); clusterAccessor.createCluster(clusterConfig);
OR
Using the HelixAdmin Java interface:
// Create setup tool instance // Note: ZK_ADDRESS is the host:port of Zookeeper String ZK_ADDRESS = "localhost:2199"; HelixAdmin admin = new ZKHelixAdmin(ZK_ADDRESS); String CLUSTER_NAME = "helix-demo"; //Create cluster namespace in zookeeper admin.addCluster(CLUSTER_NAME);
OR
Using the command-line interface:
./helix-admin.sh --zkSvr localhost:2199 --addCluster helix-demo
First we'll add new nodes to the cluster, then configure the nodes in the cluster. Each node in the cluster must be uniquely identifiable. The most commonly used convention is hostname_port.
int NUM_NODES = 2; String hosts[] = new String[]{"localhost","localhost"}; int ports[] = new int[]{7000,7001}; for (int i = 0; i < NUM_NODES; i++) { ParticipantId participantId = ParticipantId.from(hosts[i] + "_" + ports[i]); // set additional configuration for the participant; these can be accessed during node start up UserConfig userConfig = new UserConfig(Scope.participant(participantId)); userConfig.setSimpleField("key", "value"); // configure and add the participant ParticipantConfig participantConfig = new ParticipantConfig.Builder(participantId) .hostName(hosts[i]).port(ports[i]).enabled(true).userConfig(userConfig).build(); clusterAccessor.addParticipantToCluster(participantConfig); }
OR
Using the HelixAdmin Java interface:
String CLUSTER_NAME = "helix-demo"; int NUM_NODES = 2; String hosts[] = new String[]{"localhost","localhost"}; String ports[] = new String[]{7000,7001}; for (int i = 0; i < NUM_NODES; i++) { InstanceConfig instanceConfig = new InstanceConfig(hosts[i] + "_" + ports[i]); instanceConfig.setHostName(hosts[i]); instanceConfig.setPort(ports[i]); instanceConfig.setInstanceEnabled(true); //Add additional system specific configuration if needed. These can be accessed during the node start up. instanceConfig.getRecord().setSimpleField("key", "value"); admin.addInstance(CLUSTER_NAME, instanceConfig); }
A resource represents the actual task performed by the nodes. It can be a database, index, topic, queue or any other processing entity. A resource can be divided into many sub-parts known as partitions.
For scalability and fault tolerance, each partition can have one or more replicas. The state model allows one to declare the system behavior by first enumerating the various STATES, and the TRANSITIONS between them. A simple model is ONLINE-OFFLINE where ONLINE means the task is active and OFFLINE means it's not active. You can also specify how many replicas must be in each state, these are known as constraints. For example, in a search system, one might need more than one node serving the same index to handle the load.
The allowed states:
The allowed transitions:
The constraints:
The following snippet shows how to declare the state model and constraints for the MASTER-SLAVE model.
StateModelDefinition.Builder builder = new StateModelDefinition.Builder(STATE_MODEL_NAME); // Add states and their rank to indicate priority. A lower rank corresponds to a higher priority builder.addState(MASTER, 1); builder.addState(SLAVE, 2); builder.addState(OFFLINE); // Set the initial state when the node starts builder.initialState(OFFLINE); // Add transitions between the states. builder.addTransition(OFFLINE, SLAVE); builder.addTransition(SLAVE, OFFLINE); builder.addTransition(SLAVE, MASTER); builder.addTransition(MASTER, SLAVE); // set constraints on states. // static constraint: upper bound of 1 MASTER builder.upperBound(MASTER, 1); // dynamic constraint: R means it should be derived based on the replication factor for the cluster // this allows a different replication factor for each resource without // having to define a new state model // builder.dynamicUpperBound(SLAVE, "R"); StateModelDefinition statemodelDefinition = builder.build();
Then, add the state model definition:
clusterAccessor.addStateModelDefinitionToCluster(stateModelDefinition);
OR
admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, stateModelDefinition);
The final goal of Helix is to ensure that the constraints on the state model are satisfied. Helix does this by assigning a state to a partition (such as MASTER, SLAVE), and placing it on a particular node.
There are 3 assignment modes Helix can operate on
For more info on the assignment modes, see Rebalancing Algorithms section of the tutorial.
Here is an example of adding the resource in SEMI_AUTO mode (i.e. locations of partitions are specified a priori):
int NUM_PARTITIONS = 6; int NUM_REPLICAS = 2; ResourceId resourceId = resourceId.from("MyDB"); SemiAutoRebalancerContext context = new SemiAutoRebalancerContext.Builder(resourceId) .replicaCount(NUM_REPLICAS).addPartitions(NUM_PARTITIONS) .stateModelDefId(stateModelDefinition.getStateModelDefId()) .addPreferenceList(partition1Id, preferenceList) // preferred locations of each partition // add other preference lists per partition .build(); // or add all preference lists at once if desired (map of PartitionId to List of ParticipantId) context.setPreferenceLists(preferenceLists); // or generate a default set of preference lists given the set of all participants context.generateDefaultConfiguration(stateModelDefinition, participantIdSet); // add the resource to the cluster ResourceConfig resourceConfig = new ResourceConfig.Builder(resourceId) .rebalancerContext(context) .build(); clusterAccessor.addResourceToCluster(resourceConfig);
OR
String RESOURCE_NAME = "MyDB"; int NUM_PARTITIONS = 6; String MODE = "SEMI_AUTO"; int NUM_REPLICAS = 2; admin.addResource(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS, STATE_MODEL_NAME, MODE); // specify the preference lists yourself IdealState idealState = admin.getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME); idealState.setPreferenceList(partitionId, preferenceList); // preferred locations of each partition // add other preference lists per partition // or add all preference lists at once if desired idealState.getRecord().setListFields(preferenceLists); admin.setResourceIdealState(CLUSTER_NAME, RESOURCE_NAME, idealState); // or generate a default set of preference lists admin.rebalance(CLUSTER_NAME, RESOURCE_NAME, NUM_REPLICAS);