Helix Tutorial: User-Defined Rebalancing

Even though Helix can compute both the location and the state of replicas internally using a default fully-automatic rebalancer, specific applications may require rebalancing strategies that optimize for different requirements. Thus, Helix allows applications to plug in arbitrary rebalancer algorithms that implement a provided interface. One of the main design goals of Helix is to provide maximum flexibility to any distributed application. Thus, it allows applications to fully implement the rebalancer, which is the core constraint solver in the system, if the application developer so chooses.

Whenever the state of the cluster changes, as is the case when participants join or leave the cluster, Helix automatically calls the rebalancer to compute a new mapping of all the replicas in the resource. When using a pluggable rebalancer, the only required step is to register it with Helix. Subsequently, no additional bootstrapping steps are necessary. Helix uses reflection to look up and load the class dynamically at runtime. As a result, it is also technically possible to change the rebalancing strategy used at any time.

The HelixRebalancer interface is as follows:

public void init(HelixManager helixManager);

public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
    ResourceCurrentState currentState);

The first parameter is a configuration of the resource to rebalance, the second is a full cache of all of the cluster data available to Helix, and the third is a snapshot of the actual current placements and state assignments. From the cluster variable, it is also possible to access the ResourceAssignment last generated by this rebalancer. Internally, Helix implements the same interface for its own rebalancing routines, so a user-defined rebalancer will be cognizant of the same information about the cluster as an internal implementation. Helix strives to provide applications the ability to implement algorithms that may require a large portion of the entire state of the cluster to make the best placement and state assignment decisions possible.

A ResourceAssignment is a full representation of the location and the state of each replica of each partition of a given resource. This is a simple representation of the placement that the algorithm believes is the best possible. If the placement meets all defined constraints, this is what will become the actual state of the distributed system.

Rebalancer Context

Helix provides an interface called RebalancerContext. For each of the four main rebalancing modes, there is a base class called PartitionedRebalancerContext, which contains all of the basic properties required for a partitioned resource. Helix provides three derived classes for PartitionedRebalancerContext: FullAutoRebalancerContext, SemiAutoRebalancerContext, and CustomizedRebalancerContext. If none of these work for your application, you can create your own class that derives PartiitonedRebalancerContext (or even only implements RebalancerContext).

Specifying a Rebalancer

Using Logical Accessors

To specify the rebalancer, one can use PartitionedRebalancerContext#setRebalancerRef(RebalancerRef) to specify the specific implementation of the rebalancerClass. For example, here's a base constructed PartitionedRebalancerContext with a user-specified class:

RebalancerRef rebalancerRef = RebalancerRef.from(className);
PartitionedRebalancerContext rebalanceContext =
    new PartitionedRebalancerContext.Builder(resourceId).replicaCount(1).addPartition(partition1)
        .addPartition(partition2).stateModelDefId(stateModelDef.getStateModelDefId())
        .rebalancerRef(rebalancerRef).build();

The class name is a fully-qualified class name consisting of its package and its name, and the class should implement the Rebalancer interface. Now, the context can be added to a ResourceConfig through ResourceConfig.Builder#rebalancerContext(RebalancerContext) and the context will automatically be made available to the rebalancer for all subsequent executions.

Using HelixAdmin

For implementations that set up the cluster through existing code, the following HelixAdmin calls will update the Rebalancer class:

IdealState idealState = helixAdmin.getResourceIdealState(clusterName, resourceName);
idealState.setRebalanceMode(RebalanceMode.USER_DEFINED);
idealState.setRebalancerClassName(className);
helixAdmin.setResourceIdealState(clusterName, resourceName, idealState);

There are two key fields to set to specify that a pluggable rebalancer should be used. First, the rebalance mode should be set to USER_DEFINED, and second the rebalancer class name should be set to a class that implements Rebalancer and is within the scope of the project. The class name is a fully-qualified class name consisting of its package and its name.

Using YAML

Alternatively, the rebalancer class name can be specified in a YAML file representing the cluster configuration. The requirements are the same, but the representation is more compact. Below are the first few lines of an example YAML file. To see a full YAML specification, see the YAML tutorial.

clusterName: lock-manager-custom-rebalancer # unique name for the cluster
resources:
  - name: lock-group # unique resource name
    rebalancer: # we will provide our own rebalancer
      mode: USER_DEFINED
      class: domain.project.helix.rebalancer.UserDefinedRebalancerClass
...

Example

We demonstrate plugging in a simple user-defined rebalancer as part of a revisit of the distributed lock manager example. It includes a functional Rebalancer implementation, as well as the entire YAML file used to define the cluster.

Consider the case where partitions are locks in a lock manager and 6 locks are to be distributed evenly to a set of participants, and only one participant can hold each lock. We can define a rebalancing algorithm that simply takes the modulus of the lock number and the number of participants to evenly distribute the locks across participants. Helix allows capping the number of partitions a participant can accept, but since locks are lightweight, we do not need to define a restriction in this case. The following is a succinct implementation of this algorithm.

@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
    ResourceCurrentState currentState) {
  // Get the rebalcancer context (a basic partitioned one)
  PartitionedRebalancerContext context = rebalancerConfig.getRebalancerContext(
      PartitionedRebalancerContext.class);

  // Initialize an empty mapping of locks to participants
  ResourceAssignment assignment = new ResourceAssignment(context.getResourceId());

  // Get the list of live participants in the cluster
  List<ParticipantId> liveParticipants = new ArrayList<ParticipantId>(
      cluster.getLiveParticipantMap().keySet());

  // Get the state model (should be a simple lock/unlock model) and the highest-priority state
  StateModelDefId stateModelDefId = context.getStateModelDefId();
  StateModelDefinition stateModelDef = cluster.getStateModelMap().get(stateModelDefId);
  if (stateModelDef.getStatesPriorityList().size() < 1) {
    LOG.error("Invalid state model definition. There should be at least one state.");
    return assignment;
  }
  State lockState = stateModelDef.getTypedStatesPriorityList().get(0);

  // Count the number of participants allowed to lock each lock
  String stateCount = stateModelDef.getNumParticipantsPerState(lockState);
  int lockHolders = 0;
  try {
    // a numeric value is a custom-specified number of participants allowed to lock the lock
    lockHolders = Integer.parseInt(stateCount);
  } catch (NumberFormatException e) {
    LOG.error("Invalid state model definition. The lock state does not have a valid count");
    return assignment;
  }

  // Fairly assign the lock state to the participants using a simple mod-based sequential
  // assignment. For instance, if each lock can be held by 3 participants, lock 0 would be held
  // by participants (0, 1, 2), lock 1 would be held by (1, 2, 3), and so on, wrapping around the
  // number of participants as necessary.
  // This assumes a simple lock-unlock model where the only state of interest is which nodes have
  // acquired each lock.
  int i = 0;
  for (PartitionId partition : context.getPartitionSet()) {
    Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
    for (int j = i; j < i + lockHolders; j++) {
      int participantIndex = j % liveParticipants.size();
      ParticipantId participant = liveParticipants.get(participantIndex);
      // enforce that a participant can only have one instance of a given lock
      if (!replicaMap.containsKey(participant)) {
        replicaMap.put(participant, lockState);
      }
    }
    assignment.addReplicaMap(partition, replicaMap);
    i++;
  }
  return assignment;
}

Here is the ResourceAssignment emitted by the user-defined rebalancer for a 3-participant system whenever there is a change to the set of participants.

  • Participant_A joins
{
  "lock_0": { "Participant_A": "LOCKED"},
  "lock_1": { "Participant_A": "LOCKED"},
  "lock_2": { "Participant_A": "LOCKED"},
  "lock_3": { "Participant_A": "LOCKED"},
  "lock_4": { "Participant_A": "LOCKED"},
  "lock_5": { "Participant_A": "LOCKED"},
}

A ResourceAssignment is a mapping for each resource of partition to the participant serving each replica and the state of each replica. The state model is a simple LOCKED/RELEASED model, so participant A holds all lock partitions in the LOCKED state.

  • Participant_B joins
{
  "lock_0": { "Participant_A": "LOCKED"},
  "lock_1": { "Participant_B": "LOCKED"},
  "lock_2": { "Participant_A": "LOCKED"},
  "lock_3": { "Participant_B": "LOCKED"},
  "lock_4": { "Participant_A": "LOCKED"},
  "lock_5": { "Participant_B": "LOCKED"},
}

Now that there are two participants, the simple mod-based function assigns every other lock to the second participant. On any system change, the rebalancer is invoked so that the application can define how to redistribute its resources.

  • Participant_C joins (steady state)
{
  "lock_0": { "Participant_A": "LOCKED"},
  "lock_1": { "Participant_B": "LOCKED"},
  "lock_2": { "Participant_C": "LOCKED"},
  "lock_3": { "Participant_A": "LOCKED"},
  "lock_4": { "Participant_B": "LOCKED"},
  "lock_5": { "Participant_C": "LOCKED"},
}

This is the steady state of the system. Notice that four of the six locks now have a different owner. That is because of the naïve modulus-based assignmemt approach used by the user-defined rebalancer. However, the interface is flexible enough to allow you to employ consistent hashing or any other scheme if minimal movement is a system requirement.

  • Participant_B fails
{
  "lock_0": { "Participant_A": "LOCKED"},
  "lock_1": { "Participant_C": "LOCKED"},
  "lock_2": { "Participant_A": "LOCKED"},
  "lock_3": { "Participant_C": "LOCKED"},
  "lock_4": { "Participant_A": "LOCKED"},
  "lock_5": { "Participant_C": "LOCKED"},
}

On any node failure, as in the case of node addition, the rebalancer is invoked automatically so that it can generate a new mapping as a response to the change. Helix ensures that the Rebalancer has the opportunity to reassign locks as required by the application.

  • Participant_B (or the replacement for the original Participant_B) rejoins
{
  "lock_0": { "Participant_A": "LOCKED"},
  "lock_1": { "Participant_B": "LOCKED"},
  "lock_2": { "Participant_C": "LOCKED"},
  "lock_3": { "Participant_A": "LOCKED"},
  "lock_4": { "Participant_B": "LOCKED"},
  "lock_5": { "Participant_C": "LOCKED"},
}

The rebalancer was invoked once again and the resulting ResourceAssignment reflects the steady state.

Caveats

  • The rebalancer class must be available at runtime, or else Helix will not attempt to rebalance at all