Merge Waged rebalancer branch code to master. (#724)

* Define the WAGED rebalancer interfaces.

This is the intial check in for the future development of the WAGED rebalancer.
All the components are placeholders. They will be implemented gradually.

* Adding the configuration items of the WAGED rebalancer. (#348)

* Adding the configuration items of the WAGED rebalancer.

Including: Instance Capacity Keys, Rebalance Preferences, Instance Capacity Details, Partition Capacity (the weight) Details.
Also adding test to cover the new configuration items.

* Implement the WAGED rebalancer cluster model (#362)

* Introduce the cluster model classes to support the WAGED rebalancer.

Implement the cluster model classes with the minimum necessary information to support rebalance.
Additional field/logics might be added later once the detailed rebalance logic is implemented.

Also add related tests.

* Change the rebalancer assignment record to be ResourceAssignment instead of IdealState. (#398)

ResourceAssignment fit the usage better. And there will be no unnecessary information to be recorded or read during the rebalance calculation.

* Convert all the internal assignment state objects to be ResourceAssignment. (#399)

This is to avoid unnecessary information being recorded or read.

* Implement Cluster Model Provider. (#392)

* Implement Cluster Model Provider.

The model provider is called in the WAGED rebalancer to generate CLuster Model based on the current cluster status.
The major responsibility of the provider is to parse all the assignable replicas and identify which replicas need to be reassigned. Note that if the current best possible assignment is still valid, the rebalancer won't need to calculate for the partition assignment.

Also, add unit tests to verify the main logic.

* Add ChangeDetector interface and ResourceChangeDetector implementation (#388)

Add ChangeDetector interface and ResourceChangeDetector implementation

In order to efficiently react to changes happening to the cluster in the new WAGED rebalancer, a new component called ChangeDetector was added.

Changelist:
1. Add ChangeDetector interface
2. Implement ResourceChangeDetector
3. Add ResourceChangeCache, a wrapper for critical cluster metadata
4. Add an integration test, TestResourceChangeDetector

* Add cluster level default instance config. (#413)

This config will be applied to the instance when there is no (or empty) capacity configuration in the Instance Config.
Also add unit tests.

* Redefine the hard/soft constraints (#422)

* Refactor the interfaces of hard/soft constraints and a central place to keep the softConstraint weights

* Refine the WAGED rebalancer related interfaces for integration (#431)

* Refine the WAGED rebalancer related interfaces and initial integrate with the BestPossibleStateCalStage.

- Modify the BestPossibleStateCalStage logic to plugin the WAGED rebalancer.
- Refine ClusterModel to integrate with the ClusterDataDetector implementation.
- Enabling getting the changed details for Cluster Config in the change detector. Which is required by the WAGED rebalancer.

* Resubmit the change: Refine the WAGED rebalancer related interfaces for integration (#431)

* Refine the WAGED rebalancer related interfaces and initial integrate with the BestPossibleStateCalStage.

- Modify the BestPossibleStateCalStage logic to plugin the WAGED rebalancer.
- Refine ClusterModel to integrate with the ClusterDataDetector implementation.
- Enabling getting the changed details for Cluster Config in the change detector. Which is required by the WAGED rebalancer.

* Bring back the interface class and algorithm placeholder class that was removed prematurely.

* Revert "Refine the WAGED rebalancer related interfaces for integration (#431)" (#437)

This reverts commit 08a2015c617ddd3c93525afc572081a7836f9476.

* Modify the expected change type from CONFIG to CLUSTER_CONFIG in the WAGED rebalancer. (#438)

CONFIG is for generic configuration items. That will be too generic for the rebalancer.
Modify to check for CLUSTER_CONFIG to avoid confusion.

* Add special treatment for ClusterConfig

This diff allows callers of getChangeType to iterate over the result of getChangeType() by changing determinePropertyMapByType so that it just returns an empty map for ClusterConfig.

* Record the replica objects in the AssignableNode in addition to the partition name (#440)

The replica instances are required while the rebalance algorithm generating ResourceAssignment based on the AssignableNode instances.
Refine the methods of the AssignableNode for better code style and readability.
Also, modify the related test cases to verify state information and new methods.

* Add BucketDataAccessor for large writes

For the new WAGED rebalancer, it's necessary to have a data accessor that will allow writes of data exceeding 1MB. ZooKeeper's ZNode size is capped at 1MB, so BucketDataAccessor interface and ZkBucketDataAccessor help us achieve this.
Changelist:
1. Add BucketDataAccessor and ZkBucketDataAccessor
2. Add necessary serializers
3. Add an integration test against ZK

* Implement the basic constraint based algorithm (#381)

Implement basic constraint algorithm: Greedy based, each time it picks the best scores given each replica and assigns the replica to the node. It doesn't guarantee to achieve global optimal but local optimal result

The algorithm is based on a given set of constraints

* HardConstraint: Approve or deny the assignment given its condition, any assignment cannot bypass any "hard constraint"
* SoftConstraint: Evaluate the assignment by points/rewards/scores, a higher point means a better assignment
The goal is to avoid all "hard constraints" while accumulating the most points(rewards) from "soft constraints"

* Validate the instance capacity/partition weight configuration while constructing the assignable instances (#451)

Compare the configure items with the required capacity keys that are defined in the cluster config when build the assignable instances.
- According to the design, all the required capacity keys must appear in the instance capacity config.
- As for the partition weights, the corresponding weight item will be filled with value 0 if the required capacity key is not specified in the resource config.

* Implement the WAGED rebalancer with the limited functionality. (#443)

The implemented rebalancer supports basic rebalance logic. It does not contain the logic to support delayed rebalance and user-defined preference list.

Added unit test to cover the main workflow of the WAGED rebalancer.

* HardConstraints Implementation and unit tests (#433)

* Implement all of basic Hard Constraints
1. Partitions count cannot exceed instance's upper limit
2. Fault zone aware (no same partitions on the same zone)
3. Partitions weight cannot exceed instance's capacity
4. Cannot assign inactived partitions
5. Same partition of different states cannot co-exist in one instance
6. Instance doesn't have the tag of the replica

* Implement AssignmentMetadataStore (#453)

Implement AssignmentMetadataStore

AssignmentMetadataStore is a component for the new WAGED Rebalaner. It provides APIs that allows the rebalancer to read and write the baseline and best possible assignments using BucketDataAccessor.

Changelist:
1. Add AssignmentMetadataStore
2. Add an integration test: TestAssignmentMetadataStore

* Fix TestWagedRebalancer and add constructor in AssignmentMetadataStore

TestWagedRebalancer was failing because it was not using a proper HelixManager to instantiate a mock version of AssignmentMetadataStore. This diff refactors the constructors in AssignmentMetadataStore and fixes the failing test.

* Implement one of the soft constraints (#450)

Implement Instance Partitions Count soft constraint.
Evaluate by instance's current partition count versus estimated max partition count.
Intuitively, Encourage the assignment if the instance's occupancy rate is below average;
Discourage the assignment if the instance's occupancy rate is above average.

The final normalized score will be within [0, 1].
The implementation of the class will depend on the cluster current total partitions count as the max score.

* Add soft constraint: ResourcetopStateAntiAffinityConstraint (#465)

Add ResourcetopStateAntiAffinityConstraint

The more total top state partitions assigned to the instance, the lower the score, vice versa.

* Implement MaxCapacityUsageInstanceConstraint soft constraint (#463)

The constraint evaluates the score by checking the max used capacity key out of all the capacity
keys.
The higher the maximum usage value for the capacity key, the lower the score will be, implying
that it is that much less desirable to assign anything on the given node.
It is a greedy approach since it evaluates only the most used capacity key.

* Add soft constraint: ResourcePartitionAntiAffinityConstraint (#464)

If the resource of the partition overall has a light load on the instance, the score is higher compared to case when the resource is heavily loaded on the instance

* Improve ResourceTopStateAntiAffinityConstraint (#475)

- fix the min max range to be [0,1]
- add unit test for normalized score

* Adjust the expected replica count according to fault zone count. (#476)

The rebalancer should determine the expected replica count according to the fault zone instead of the node count only.

* PartitionMovementSoftConstraint Implementation (#474)

Add soft constraint: partition movement constraint

Evaluate the proposed assignment according to the potential partition movements cost.
The cost is evaluated based on the difference between the old assignment and the new assignment.

* Add the remaining implementation of ConstraintBasedAlgorithmFactory (#478)

Implementation of ConstraintBasedAlgorithmFactory and the soft constraint weight model.
Remove SoftConstraintWeightModel class.
Get the rebalance preference and adjust the corresponding weight.
Pass the preference keys instead of cluster config.

* Integrate the WAGED rebalancer with all the related components. (#466)

1. Integrate with the algorithm, assignment metadata store, etc. Fix several conflicting interfaces and logics so as to all the rebalancer run correctly.
2. Complete OptimalAssignment.
3. Add integration tests to ensure the correctness of rebalancing logic.

* Separate AssignableNode properties by Immutable and Mutable (#485)

Make AssignableNode properties different by Immutable and Mutable
- It helps detect any wrong usage of these properties early

* Enable maintenance mode for the WAGED rebalancer.

The maintenance mode rebalance logic keeps the same as the previous feature.
Add more tests about partition migration and node swap that requires maintenance mode.

* Add delayed rebalance and user-defined preference list features to the WAGED rebalancer. (#456)

- Add delayed rebalance and user-defined preference list features to the WAGED rebalancer.
- Refine the delayed rebalance usage in the waged rebalancer.
- Add the delayed rebalance scheduling logic.
- Add the necessary tests. And fix TestMixedModeAutoRebalance and all delayed rebalance tests.

* Adjust the topology processing logic for instance to ensure backward compatibility.

* Load soft constraint weight from resources/properties file (#492)

Load the soft constraint's weight from a properties file.
It makes easier for us to adjust weights in the future.

* Add latency metric components for WAGED rebalancer (#490)

Add WAGED rebalancer metric framework and latency metric implementation

Changelist:
1. Add WAGED rebalancer metric interface
2. Implement latency-related metrics
3. Integrate latency metrics into WAGED rebalancer
4. Add tests

* Fixing rebalance cache issue and stablize the tests. (#510)

1. Fix the DelayedAutoRebalancer Cache issue that ClusterConfig change won't trigger rebalance. The current workaround in our code blocks the WAGED rebalancer logic. So we need to fix it while merging the WAGED rebalancer code.
2. Refine the ResourceChangeDetector's usage in the WAGED rebalancer so as to avoid unnecessary global rebalance.
3. Extend the StrictMatchExternalViewVerifier so it can be used to test the WAGED rebalance feature.

* More strict partition weight validation while creating the cluster model. (#511)

1. If any capacity key is not configured in the Resource Config (or default weight) as the partition weight, the config is invalid.
2. If any partition weight is configured with a negative number, the config is invalid.
Note that the rebalancer will not compute a new assignment if any capacity/weight config is invalid.

* Increase parallelism for ZkBucketDataAccessor (#506)

* Increase parallelism for ZkBucketDataAccessor

This diff improves parallelism and throughput for ZkBucketDataAccessor. It implements the following ideas:
1. Optimistic Concurrency Control
2. Monotonically Increasing Version Number
3. Garbage Collection of Stale Metadata
4. Retrying Reads Upon Failure

* The WAGED rebalancer returns the previously calculated assignment on calculation failure (#514)

* The WAGED rebalancer returns the previously calculated assignment on calculation failure.

This is to protect the cluster assignment on a rebalancing algorithm failure. For example, the cluster is out of capacity. In this case, the rebalancer will keep using the previously calculated mapping.
Also, refine the new metric interface, and add the RebalanceFailureCount metric for recording the failures.

Modify the test cases so that DBs from different test cases have a different name. This is to avoid previous test records to be returned by the rebalancer on calculation error.

* Make log clearer after finishing calculateAssignment. (#531)

Make log clearer after finishing calculateAssignment.

* Implement monitoring mbeans for the WAGED rebalancer. (#525)

Change list:
1. GlobalBaselineCalcCounter: Counter of the global rebalance.
2. PartialRebalanceCounter: Counter of the partial rebalance done.
3. BaselineDivergenceGauge: Gauge of the difference at replica level between the Baseline and the Best Possible assignments.

* Refine the rebalance scope calculating logic in the WAGED rebalancer. (#519)

* Refine the rebalane scope calculating logic in the WAGED rebalancer.

1. Ignore the IdealState mapping/listing fields if the resource is in FULL_AUTO mode.
2. On IdealState change, the resource shall be fully rebalanced since some filter conditions might be changed. Such as instance tag.
3. Live instance change (node newly connected) shall trigger full rebalance so partitions will be re-assigned to the new node.
4. Modify the related test cases.
5. Adding an option to the change detector so if it is used elsewhere, the caller has an option to listen to any change.

* Make WagedRebalancer static by creating a ThreadLocal (#540)

ZKBucketDataAccessor has a GC logic, but this is only valid if the ZkClient inside it is active and not closed. Currently, WAGED rebalancer generates an instance of AssignmentMetadataStore every time it rebalances, which does not allow the internal ZkBucketDataAccessor to garbage collect the assignment metadata it wrote previously.

This diff makes the entire WagedRebalancer object a ThreadLocal, which has the effect of making it essentially static across different runs of the pipeline.

* Change change detector to a regular field in the WAGED rebalancer instead of static threadlocal. (#543)

* Change change detector to regular field instead of static thread-local.

The rebalance has been modified to be a thread-local object. So there is no need to keep the change detector as thread-local.
This may cause potential problems.
In addition, in order to avoid resource leakage, implement the finalize method of the WagedRebalancer to close all connections.

* Refactor soft constraints to simply the algorithm and fix potential issues. (#520)

* Refactor soft constraints to simply the algorithm and fix potential issues.

1. Check for zero weight so as to avoid unnecessary calculations.
2. Simply the soft constraint interfaces and implementations. Avoid duplicate code.
3. Adjust partition movements constraint logic to reduce the chance of moving partition when the baseline and best possible assignment diverge.
4. Estimate utilization in addition to the other usage estimation. The estimation will be used as a base when calculating the capacity usage score. This is to ensure the algorithm treats different clusters with different overall usage in the same way.
5. Fix the issue that high utilization calculation does not consider the current proposed replica usage.
6. Use Sigmoid to calculate usage-based soft constraints score. This enhances the assignment result of the algorithm.
7. Adjust the related test cases.

* Minor fix for the constraints related tests. (#545)

Minor fix for the constraints related tests.

* Adjust the replica rebalance calculating ordering to avoid static order. (#535)

* Adjust the replica rebalance calculating ordering to avoid static order.

The problem of a static order is that the same set of replicas will always be the ones that are moved or state transited during the rebalance.
This randomize won't change the algorithm's performance. But it will help the Helix to eliminate very unstable partitions.

* Implement increment() method in CountMetric class. (#537)

Abstract method increaseCount() in CountMetric is a generic method used in inherited classes. We should implement this method in CountMetric to reduce duplicate code in inherited classes.
Change list:
1. Move increaseCount() to CountMetric.
2. Change the name to increment() and implement the method.

* Modify the ivy file to add the new math3 lib dependency. (#546)

Modify the ivy file to add the new math3 lib dependency.

* Fix a missing parameter when the WAGED rebalancer init the change detector. (#547)

This parameter was missed during the previous change.

* Add the new Rebalancer monitor domain to the active domain list. (#550)

Add the new Rebalancer monitor domain to the active domain list.

* Refine ivy file config. The org were not configured correctly. (#551)

* Use a deep copy of the new best possible assignment for measuring baseline divergence. (#542)

The new assignment is really critical in waged rebalancer. If there is any potential changes in measure baseline divergence, waged rebalancer may not work correctly.
To avoid changes of the new assignment and make it safe when being used to measure baseline divergence, use a deep copy of the new assignment.

* Add max capacity usage metric for instance monitor. (#548)

We need to monitor instance's max utilization in purpose of understanding what the max capacity usage is and knowing the status of the instance.

Change list:
1. Change instance monitor to extend dynamic metric, and change code logic in ClusterStatusMonitor to adapt the InstanceMonitor changes.
2. Add APIs for get/update MaxCapacityUsage.
3. Add an API in cluster status monitor to update max capacity usage.
4. Add unit tests for instance monitor and updateing max capacity usage.

* Fix formula incorrection in the comment for measuring baseline divergence. (#559)

Fix incorrect formula in the comment for measuring baseline divergence.

* Avoid redundant writes in AssignmentMetadataStore (#564)

For the WAGED rebalancer, we persist the cluster's mapping via AssignmentMetadataStore every pipeline. However, if there are no changes made to the new assignment from the old assignment, this write is not necessary. This diff checks whether they are equal and skips the write if old and new assignments are the same.

* Filter resource map with ideal states for instance capacity metrics. (#574)

ResourceToReblance map also has resources from current states. And this causes null pointer exceptions at parsing all replicas stage when the resource is not in ideal states. This diff fixes the issue by only using the resources in ideal states to parse all replicas.

* Introduce Dry-run Waged Rebalancer for the verifiers and tests. (#573)

Use a dry-run rebalancer to avoid updating the persisted rebalancer status in the verifiers or tests.
Also, refine several rebalancer related interfaces so as to simplify the dry-run rebalancer implementation.
Convert the test cases back to use the BestPossibleExternalViewVerifier.

Additional fixing:
- Updating the rebalancer preference for every rebalancer.compute calls. Since the preference might be updated at runtime.
- Fix one minor metric domain name bug in the WagedRebalancerMetricCollector.
- Minor test case fix to make them more stable after the change.

* Change ClusterConfig.setDefaultCapacityMap to be private. (#590)

Change ClusterConfig.setDefaultCapacityMap to be private.

* Add Java API for adding and validating resources for WAGED rebalancer (#570)

Add Java API methods for adding and validating resources for WAGED rebalancer. This is a set of convenience APIs provided through HelixAdmin the user could use to more easily add resources and validate them for WAGED rebalance usage.
Changelist:
1. Add API methods in HelixAdmin
2. Implement the said methods
3. Add tests

* Change calculation for baseline divergence. (#598)

Change the calculation for baseline divergence: 0.0 means no difference, 1.0 means all are different.

* Improve the WAGED rebalancer performance. (#586)

This change improves the rebalance's speed by 2x to 5x depends on the host capacity.

Parallelism the loop processing whenever possible and help to improve the performance. This does not change the logic.
Avoid some duplicate logic in the loop. Put the calculation outside the loop and only do it once.

* Fix the unstable test TestZeroReplicaAvoidance. (#603)

Fix the unstable test TestZeroReplicaAvoidance by waiting.
This is a temporary resolution before we fix issue #526. Marked it in the TODO comment so easier for us to remove the wait in batch later.

* Add REST API endpoints for WAGED Rebalancer (#611)

We want to make WAGED rebalancer (weight-aware) easier to use. One way to do this is to allow the user to easily add resources with weight configuration set by providing REST endpoints. This change adds the relevant REST endpoints based on the HelixAdmin APIs added in (#570).

Basically, this commit uses existing REST endpoints whose hierarchy is defined by REST resource. What this commit does to the existing endpoints is 1) Add extra commands 2) Add a WAGED command as a QueryParam so that WAGED logic could be included.

This change is backward-compatible because it keeps the original behavior when no commands are provided by using @DefaultValue annotation.

* Fix a potential issue in the ResourceChangeSnapshot. (#635)

The trim logic in the ResourceChangeSnapshot for cleaning up the IdealState should not clear the whole map. This will cause the WAGED rebalancer ignores changes such as new partitions into the partition list.
Modify the test case accordingly.

* Simply and enhance the RebalanceLatencyGauge so it can be used in multi-threads. (#636)

The previous design of RebalanceLatencyGauge won't support asynchronous metric data emitting. This PR adds support by using a ThreadLocal object.
The metric logic is not changed.

* Add new WAGED rebalancer config item "GLOBAL_REBALANCE_ASYNC_MODE". (#637)

This option will be used by the WAGED rebalancer to determine if the global rebalance should be performed asynchronously.

* Decouple the event type and the scheduled rebalance cache refresh option. (#638)

The previous design is that both on-demand and periodic rebalance scheduling task will request for a cache refresh. This won't be always true moving forward.
For example, the WAGED rebalancer async baseline calculating requests for a scheduled rebalance. But cache refresh won't be necessary.
This PR does not change any business logic. It prepares for future feature change.
This PR ensures strict backward compatibility.

* Improve the algorithm so it prioritizes the assignment to the idle nodes when the constraint evaluation results are the same (#651)

This is to get rid of the randomness when the algorithm result is a tie. Usually, when the algorithm picks up the nodes with the same score, more partition movements will be triggered on a cluster change.

* Refine the WAGED rebalancer to minimize the partial rebalance workload. (#639)

* Refine the WAGED rebalancer to minimize the partial rebalance workload.

Split the cluster module calculation method so that different rebalance logic can have different rebalance scope calculation logic.
Also, refine the WAGED rebalancer logic to reduce duplicate code.

* Refine methods name and comments. (#664)

* Refine methods name and comments.

* Asynchronously calculating the Baseline (#632)

* Enable the Baseline calculation to be asynchronously done.

This will greatly fasten the rebalance speed. Basically, the WAGED rebalancer will firstly partial rebalance to recover the invalid replica allocations (for example, the ones that are on a disabled instance). Then it calculates the new baseline by global rebalancing.

* Reorgnize the test case so the new WAGED expand cluster tests are not skipped. (#670)

TestNG cannot handle test classes inheritance well. Some of the tests are skipped with the current design. Move the logic to the new test class so it is no longer a child of another test class. This ensures all the test cases are running.

* Fix the Helix rest tests by cleaning up the environment before testing. (#679)

The validateWeight test methods in TestInstanceAccessor and TestPreInstanceAccessor are testing against the same instance config fields. There was a conflict if both of the test cases are executed in a certain order. This change adds cleanup logic so the shared fields will be empty before each test method starts.

* Add instance capacity gauge (#557)

We need to monitor instance utilization in purpose of understanding what the instance capacity is.

Change list:
- Change instance monitor to update capacity
- Change getAttribute to throw AttributeNotFoundException in DynamicMBeanProvider
- Combine max usage and instance capacity update into one method in cluster status monitor
- Add unit test

* Add resource partition weight gauge (#686)

We would like to monitor the usage of each capacity for the resource partitions: gauge of the average partition weight for each CAPACITY key.

Change list:
- Add partition weight gauge metric to resource monitor.
- Add two unit tests to cover new code.

* Add WAGED rebalancer reset method to clean up cached status. (#696)

The reset method is for cleaning up any in-memory records within the WAGED rebalancer so we don't need to recreate one.

Detailed change list:
1. Add reset methods to all the stateful objects that are used in the WAGED rebalancer.
2. Refine some of the potential race condition in the WAGED rebalancer components.
3. Adjust the tests accordingly. Also adding new tests to cover the components reset / the WAGED rebalancer reset logic.

* Reset the WAGED rebalancer once the controller newly acquires leadership. (#690)

This is to prevent any cached assignment information which is recorded during the previous session from impacting the rebalance result.
Detailed change list:

Move the stateful WAGED rebalancer to the GenericHelixController object instead of the rebalance stage. This is for resolving the possible race condition between the event processing thread and leader switch handling thread.
Adding a new test regarding leadership switch to verify that the WAGED rebalancer has been reset after the processing.

Co-authored-by: Hunter Lee <narendly@gmail.com>
Co-authored-by: Yi Wang <ywang4@linkedin.com>
Co-authored-by: Huizhi Lu <hulu@linkedin.com>
149 files changed
tree: 9718878c630f4d994efb5f2e3161e7c6e946ebe3
  1. helix-admin-webapp/
  2. helix-agent/
  3. helix-core/
  4. helix-front/
  5. helix-rest/
  6. recipes/
  7. scripts/
  8. website/
  9. .gitignore
  10. build
  11. bump-up.command
  12. deploySite.sh
  13. helix-style-intellij.xml
  14. helix-style.xml
  15. hpost-review.sh
  16. LICENSE
  17. NOTICE
  18. pom.xml
  19. README.md
README.md

Apache Helix

Helix is part of the Apache Software Foundation.

Project page: http://helix.apache.org/

Mailing list: http://helix.apache.org/mail-lists.html

Build

mvn clean install package -DskipTests

WHAT IS HELIX

Helix is a generic cluster management framework used for automatic management of partitioned, replicated and distributed resources hosted on a cluster of nodes. Helix provides the following features:

  1. Automatic assignment of resource/partition to nodes
  2. Node failure detection and recovery
  3. Dynamic addition of Resources
  4. Dynamic addition of nodes to the cluster
  5. Pluggable distributed state machine to manage the state of a resource via state transitions
  6. Automatic load balancing and throttling of transitions