Every algorithm phase has the following main sections:
Operations, which can trigger rebalance occurred:
Write new replicas configuration number to table config (effectively from 1 node)
OR
Write new partitions configuration number to table config (effectively from 1 node)
Write new assignments' intention to metastore (effectively from 1 node in cluster)
Start new raft nodes. Initiate/update asynchronous change peer request to raft group (effectively from 1 node per partition)
Stop all redundant nodes. Change stable partition assignment to the new one and finish rebalance process.
For further steps, we should introduce some new metastore keys:
partition.assignments.stable - the list of peers, which process operations for a partition at the current moment.partition.assignments.pending - the queue of lists of peers, where current rebalance move the partition. Queue is needed in cases when multiple configuration switches during one rebalance are required.partition.assignments.planned - the list of peers, which will be used for new rebalance, when current will be finished.Also, we will need the utility key:
partition.change.trigger - the key, needed for processing the event about assignments' update trigger only once, stores the event timestamp.Three types of events can trigger the rebalance:
org.apache.ignite.configuration.schemas.table.TableChange.changeReplicas produce metastore update eventorg.apache.ignite.configuration.schemas.table.TableChange.changePartitions produce metastore update event (IMPORTANT: this type of trigger has additional difficulties because of cross raft group data migration and it is out of scope of this document)Result: So, one of two metastore keys' changes will trigger rebalance:
<tableScope>.replicas <tableScope>.partitions // out of scope
Trigger:
<tableScope>.replicas (See org.apache.ignite.internal.table.distributed.TableManager.onUpdateReplicas)Pseudocode:
onReplicaNumberChange:
with table as event.table:
for partitoin in table.partitions:
<inline metastoreInvoke>
metastoreInvoke: // atomic metastore call through multi-invoke api
if empty(partition.change.trigger) || partition.change.trigger < event.timestamp:
if empty(partition.assignments.pending) && partition.assignments.stable != calcPartAssighments():
partition.assignments.pending = partAssignmentsPendingQueue
partition.change.trigger = event.timestamp
else:
if partition.assignments.pending != partAssignmentsPendingQueue
partition.assignments.planned = calcPartAssignments()
partition.change.trigger = event.timestamp
else
remove(partition.assignments.planned)
else:
skip
Trigger: Metastore event about new partition.assignments.pending received (See corresponding listener for pending key in org.apache.ignite.internal.table.distributed.TableManager.registerRebalanceListeners)
Steps:
partition.assignments.pending / partition.assignments.stableRaftGroupService#changePeersAndLearnersAsync(leaderTerm, peers). RaftGroupService#changePeersAndLearnersAsync from old terms must be skipped.Result:
Trigger: When leader applied new Configuration with list of resulting peers <applied peer>, it calls RebalanceRaftGroupEventsListener.onNewPeersConfigurationApplied(<applied peers>)
Pseudocode:
metastoreInvoke: \\ atomic
partition.assignments.stable = appliedPeers
if empty(partition.assignments.planned):
partition.assignments.pending = empty
else:
partition.assignments.pending = partition.assignments.planned
remove(partition.assignments.planned)
RebalanceRaftGroupEventsListener.onNewPeersConfigurationApplied(<applied peers>) also is responsible for the updating of assignments of the table. When assignments are updated, corresponding listener for its updates will be triggered (see org.apache.ignite.internal.table.distributed.TableManager.onUpdateAssignments), so raft clients will be updated.
After stable key is updated, corresponding listener for that change is called, so redundant raft nodes may be stopped (see corresponding listener for stable key in org.apache.ignite.internal.table.distributed.TableManager.registerRebalanceListeners)
Failover helpers
RebalanceRaftGroupEventsListener.onLeaderElected - must be executed from the new leader when raft group elected the new leader. Maybe we actually need to also check if a new lease is received.RebalanceRaftGroupEventsListener.onReconfigurationError - must be executed when any errors during RaftGroupService#changePeersAndLearnersAsync occurred. For more info about change peers process, and more specifically, catch up process, see modules/raft/tech-notes/changePeersAndLearners.md and modules/raft/tech-notes/nodeCatchUp.mdRebalanceRaftGroupEventsListener.onNewPeersConfigurationApplied(peers) - must be executed with the list of new peers when RaftGroupService#changePeersAndLearnersAsync has successfully done.Trigger: Node receive update about partition stable assignments
Steps:
partition.assignments.pending + partition.assignments.stable set)Result:
We need to provide Failover thread, which can handle the following cases:
RaftGroupService#changePeersAndLearnersAsync can‘t start even catchup process, because of any new raft nodes wasn’t started yet for instance.RaftGroupService#changePeersAndLearnersAsync failed to complete catchup due to catchup timeout, for example. To check all possible error cases during catch up stage, check modules/raft/tech-notes/nodeCatchUp.mdWe have the following mechanisms for handling these cases:
RebalanceRaftGroupEventsListener.onReconfigurationError, which schedules retries, if neededRaftGroupService#changePeersAndLearnersAsync with legacy term, receive appropriate answer from the leader and stop retries for this partition.RebalanceRaftGroupEventsListener.onLeaderElected invoke and start needed RaftGroupService#changePeersAndLearnersAsync from the pending key.It seems, that rebalance of metastore can be handled the same process, because:
changePeers raft group can handle any another entriesAlso, failover mechanism above doesn't use metastore, but raft term and special listeners.
changePeersAlgorithm above seems working well, but it has one serious caveat. When the leader is busy by current changePeers, we can‘t start new one. That’s a big issue - because data rebalance process can be long enough, while all nodes sync raft logs with data. According to https://github.com/apache/ignite-3/blob/main/modules/raft/tech-notes/changePeersAndLearners.md - we can relatively painless update the peers' list, if leader is in the STAGE_CATCHING_UP phase still. Alternatively, we can cancel current changePeers, if it is in the STAGE_CATCHING_UP and run new one
changePeersThis approach can be addressed with different implemetation details, but let's describe the simplest one.
partition.assignments.pending.lockPseudocode
metastoreInvoke: // atomic metastore call through multi-invoke api
if empty(partition.change.trigger) || partition.change.trigger < event.timestamp:
if empty(partition.assignments.pending):
if partition.assignments.stable != calcPartAssignments():
partition.assignments.pending = partAssignmentsPendingQueue
partition.change.trigger = event.timestamp
else:
skip
else:
if empty(partition.assignments.pending.lock):
partition.assignments.pending = partAssignmentsPendingQueue
partition.change.trigger = event.timestamp
else:
partition.assignments.planned = calcPartAssignments()
partition.change.trigger = event.timestamp
else:
skip
changePeers request-response behaviour with:received if no current changePeers or current changePeers in the STAGE_CATCHING_UP. Updates the catching up peers with the new peers, stop redundant replicators, if needed.busy if current leader is not in the STAGE_NONE or STAGE_CATCHING_UP phase.changePeers behaviour with new listener from the caller tryCatchUpFinish(peers). This listener must execute the following metastore call:Pseudocode:
metastoreInvoke: // atomic metastore call through multi-invoke api
if empty(partition.assignments.pending.lock):
if peers == partition.assignments.pending:
partition.assignments.pending.lock = true
return true
else:
return false
If the listener return false - we should to await the new peer list, process it and try to call it again.
changePeers and run new oneInstead of updating current changePeers with new peers' list - we can cancel it and start the new one.
For this dish we will need:
cancelChangePeersAndLearners(). This method should cancel current changePeers if and only if it is in the STAGE_CATCHING_UP phase. Method must return:changePeers in progress and can't be cancelled (like in approach 1 - if the leader is not in STAGE_CATCHING_UP/STAGE_NONE)partition.assignments.planned key and on update:cancelChangePeersAndLearners() on the node with the partition leader. If it returns false - do nothing.true - move planned peers to pending in metastore