Helix Tutorial

In this tutorial, we will cover the roles of a Helix-managed cluster, and show the code you need to write to integrate with it. In many cases, there is a simple default behavior that is often appropriate, but you can also customize the behavior.

Convention: we first cover the basic approach, which is the easiest to implement. Then, we'll describe advanced options, which give you more control over the system behavior, but require you to write more code.

Prerequisites

  1. Read Concepts/Terminology and Architecture
  2. Read the Quickstart guide to learn how Helix models and manages a cluster
  3. Install Helix source. See: Quickstart for the steps.

Tutorial Outline

  1. Participant
  2. Spectator
  3. Controller
  4. Rebalancing Algorithms
  5. User-Defined Rebalancing
  6. State Machines
  7. Messaging
  8. Customized health check
  9. Throttling
  10. Application Property Store
  11. Logical Accessors
  12. Admin Interface
  13. YAML Cluster Setup
  14. Helix Agent (for non-JVM systems)

Preliminaries

First, we need to set up the system. Let's walk through the steps in building a distributed system using Helix. We will show how to do this using both the Java admin interface, as well as the cluster accessor interface. You can choose either interface depending on which most closely matches your needs.

Start ZooKeeper

This starts a zookeeper in standalone mode. For production deployment, see Apache ZooKeeper for instructions.

./start-standalone-zookeeper.sh 2199 &

Create a Cluster

Creating a cluster will define the cluster in appropriate ZNodes on ZooKeeper.

Using the Java accessor API:

// Note: ZK_ADDRESS is the host:port of Zookeeper
String ZK_ADDRESS = "localhost:2199";
HelixConnection connection = new ZKHelixConnection(ZK_ADDRESS);

ClusterId clusterId = ClusterId.from("helix-demo");
ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
ClusterConfig clusterConfig = new ClusterConfig.Builder(clusterId).build();
clusterAccessor.createCluster(clusterConfig);

OR

Using the HelixAdmin Java interface:

// Create setup tool instance
// Note: ZK_ADDRESS is the host:port of Zookeeper
String ZK_ADDRESS = "localhost:2199";
HelixAdmin admin = new ZKHelixAdmin(ZK_ADDRESS);

String CLUSTER_NAME = "helix-demo";
//Create cluster namespace in zookeeper
admin.addCluster(CLUSTER_NAME);

OR

Using the command-line interface:

./helix-admin.sh --zkSvr localhost:2199 --addCluster helix-demo

Configure the Nodes of the Cluster

First we'll add new nodes to the cluster, then configure the nodes in the cluster. Each node in the cluster must be uniquely identifiable. The most commonly used convention is hostname_port.

int NUM_NODES = 2;
String hosts[] = new String[]{"localhost","localhost"};
int ports[] = new int[]{7000,7001};
for (int i = 0; i < NUM_NODES; i++)
{
  ParticipantId participantId = ParticipantId.from(hosts[i] + "_" + ports[i]);

  // set additional configuration for the participant; these can be accessed during node start up
  UserConfig userConfig = new UserConfig(Scope.participant(participantId));
  userConfig.setSimpleField("key", "value");

  // configure and add the participant
  ParticipantConfig participantConfig = new ParticipantConfig.Builder(participantId)
      .hostName(hosts[i]).port(ports[i]).enabled(true).userConfig(userConfig).build();
  clusterAccessor.addParticipantToCluster(participantConfig);
}

OR

Using the HelixAdmin Java interface:

String CLUSTER_NAME = "helix-demo";
int NUM_NODES = 2;
String hosts[] = new String[]{"localhost","localhost"};
String ports[] = new String[]{7000,7001};
for (int i = 0; i < NUM_NODES; i++)
{
  InstanceConfig instanceConfig = new InstanceConfig(hosts[i] + "_" + ports[i]);
  instanceConfig.setHostName(hosts[i]);
  instanceConfig.setPort(ports[i]);
  instanceConfig.setInstanceEnabled(true);

  //Add additional system specific configuration if needed. These can be accessed during the node start up.
  instanceConfig.getRecord().setSimpleField("key", "value");
  admin.addInstance(CLUSTER_NAME, instanceConfig);
}

Configure the Resource

A resource represents the actual task performed by the nodes. It can be a database, index, topic, queue or any other processing entity. A resource can be divided into many sub-parts known as partitions.

Define the State Model and Constraints

For scalability and fault tolerance, each partition can have one or more replicas. The state model allows one to declare the system behavior by first enumerating the various STATES, and the TRANSITIONS between them. A simple model is ONLINE-OFFLINE where ONLINE means the task is active and OFFLINE means it's not active. You can also specify how many replicas must be in each state, these are known as constraints. For example, in a search system, one might need more than one node serving the same index to handle the load.

The allowed states:

  • MASTER
  • SLAVE
  • OFFLINE

The allowed transitions:

  • OFFLINE to SLAVE
  • SLAVE to OFFLINE
  • SLAVE to MASTER
  • MASTER to SLAVE

The constraints:

  • no more than 1 MASTER per partition
  • the rest of the replicas should be slaves

The following snippet shows how to declare the state model and constraints for the MASTER-SLAVE model.

StateModelDefinition.Builder builder = new StateModelDefinition.Builder(STATE_MODEL_NAME);

// Add states and their rank to indicate priority. A lower rank corresponds to a higher priority
builder.addState(MASTER, 1);
builder.addState(SLAVE, 2);
builder.addState(OFFLINE);

// Set the initial state when the node starts
builder.initialState(OFFLINE);

// Add transitions between the states.
builder.addTransition(OFFLINE, SLAVE);
builder.addTransition(SLAVE, OFFLINE);
builder.addTransition(SLAVE, MASTER);
builder.addTransition(MASTER, SLAVE);

// set constraints on states.

// static constraint: upper bound of 1 MASTER
builder.upperBound(MASTER, 1);

// dynamic constraint: R means it should be derived based on the replication factor for the cluster
// this allows a different replication factor for each resource without
// having to define a new state model
//
builder.dynamicUpperBound(SLAVE, "R");
StateModelDefinition statemodelDefinition = builder.build();

Then, add the state model definition:

clusterAccessor.addStateModelDefinitionToCluster(stateModelDefinition);

OR

admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, stateModelDefinition);
Assigning Partitions to Nodes

The final goal of Helix is to ensure that the constraints on the state model are satisfied. Helix does this by assigning a state to a partition (such as MASTER, SLAVE), and placing it on a particular node.

There are 3 assignment modes Helix can operate on

  • FULL_AUTO: Helix decides the placement and state of a partition.
  • SEMI_AUTO: Application decides the placement but Helix decides the state of a partition.
  • CUSTOMIZED: Application controls the placement and state of a partition.

For more info on the assignment modes, see Rebalancing Algorithms section of the tutorial.

Here is an example of adding the resource in SEMI_AUTO mode (i.e. locations of partitions are specified a priori):

int NUM_PARTITIONS = 6;
int NUM_REPLICAS = 2;
ResourceId resourceId = resourceId.from("MyDB");

SemiAutoRebalancerContext context = new SemiAutoRebalancerContext.Builder(resourceId)
  .replicaCount(NUM_REPLICAS).addPartitions(NUM_PARTITIONS)
  .stateModelDefId(stateModelDefinition.getStateModelDefId())
  .addPreferenceList(partition1Id, preferenceList) // preferred locations of each partition
  // add other preference lists per partition
  .build();

// or add all preference lists at once if desired (map of PartitionId to List of ParticipantId)
context.setPreferenceLists(preferenceLists);

// or generate a default set of preference lists given the set of all participants
context.generateDefaultConfiguration(stateModelDefinition, participantIdSet);

// add the resource to the cluster
ResourceConfig resourceConfig = new ResourceConfig.Builder(resourceId)
  .rebalancerContext(context)
  .build();
clusterAccessor.addResourceToCluster(resourceConfig);

OR

String RESOURCE_NAME = "MyDB";
int NUM_PARTITIONS = 6;
String MODE = "SEMI_AUTO";
int NUM_REPLICAS = 2;

admin.addResource(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS, STATE_MODEL_NAME, MODE);

// specify the preference lists yourself
IdealState idealState = admin.getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
idealState.setPreferenceList(partitionId, preferenceList); // preferred locations of each partition
// add other preference lists per partition

// or add all preference lists at once if desired
idealState.getRecord().setListFields(preferenceLists);
admin.setResourceIdealState(CLUSTER_NAME, RESOURCE_NAME, idealState);

// or generate a default set of preference lists
admin.rebalance(CLUSTER_NAME, RESOURCE_NAME, NUM_REPLICAS);