In the current RocketMQ Raft mode, the DLedger Commitlog is mainly used to replace the original Commitlog, enabling the Commitlog to have the ability to elect and replicate. However, this also causes some problems:
Therefore, we hope to use DLedger to implement a consistency module (DLedger Controller) based on Raft, and use it as an optional leader election component. It can be deployed independently or embedded in the Nameserver. The Broker completes the election of the Master through interaction with the Controller, thus solving the above problems. We refer to this new mode as the Controller mode.
In order to unify the log replication process, distinguish the log replication boundary of each Master, and facilitate log truncation, the concept of MasterEpoch is introduced, which represents the current Master's term number (similar to the meaning of Raft Term).
For each Master, it has a MasterEpoch and a StartOffset, which respectively represent the term number and the starting log offset of the Master.
It should be noted that the MasterEpoch is determined by the Controller and is monotonically increasing.
In addition, we have introduced the EpochFile, which is used to store the <Epoch, StartOffset> sequence.
When a Broker becomes the Master, it will:
When a Broker becomes the Slave, it will:
Ready stage:
Handshake stage:
Transfer stage:
The specific log truncation algorithm flow is as follows:
slave:TreeMap<Epoch, Pair<startOffset,endOffset>> epochMap; Iterator iterator = epochMap.entrySet().iterator(); truncateOffset = -1; //The epochs are sorted from largest to smallest while (iterator.hasNext()) { Map.Entry<Epoch, Pair<startOffset,endOffset>> curEntry = iterator.next(); Pair<startOffset,endOffset> masterOffset= findMasterOffsetByEpoch(curEntry.getKey()); if(masterOffset != null && curEntry.getKey().getObejct1() == masterOffset.getObejct1()) { truncateOffset = Math.min(curEntry.getKey().getObejct2(), masterOffset.getObejct2()); break; } }
Since HA replicates logs based on stream, we cannot distinguish the boundaries of the logs (that is, a batch of transmitted logs may span multiple MasterEpochs), and the Slave cannot detect changes in MasterEpoch and cannot timely modify EpochFile.
Therefore, we have made the following improvements:
When the Master transfers logs, it ensures that a batch of logs sent at a time is in the same epoch, but not spanning multiple epochs. We can add two variables in WriteSocketService:
When WriteSocketService transfers the next batch of logs (assuming the total size of this batch is size), if it finds that nextTransferFromWhere + size > currentTransferEpochEndOffset, it sets selectMappedBufferResult limit to currentTransferEpochEndOffset. Finally, modify currentTransferEpoch and currentTransferEpochEndOffset to the next epoch.
Correspondingly, when the Slave receives logs, if it finds a change in epoch from the header, it records it in the local epoch file.
According to the above, we can know the AutoSwitchHaService protocol divides log replication into multiple stages. Below is the protocol for the HaService.
1.AutoSwitchHaClient (Slave) will send a HandShake packet to the Master as follows:
current state(4byte) + Two flags(4byte) + slaveAddressLength(4byte) + slaveAddress(50byte)
Current state
represents the current HAConnectionState, which is HANDSHAKE.
Two flags are two status flags, where isSyncFromLastFile
indicates whether to start copying from the Master's last file, and isAsyncLearner
indicates whether the Slave is an asynchronous copy and joins the Master as a Learner.
slaveAddressLength
and slaveAddress
represent the address of the Slave, which will be used later to join the SyncStateSet.
2.AutoSwitchHaConnection (Master) will send a HandShake packet back to the Slave as follows:
current state(4byte) + body size(4byte) + offset(8byte) + epoch(4byte) + body
Current state
represents the current HAConnectionState, which is HANDSHAKE.Body size
represents the length of the body.Offset
represents the maximum offset of the log on the Master side.Epoch
represents the Master's Epoch.After the Slave receives the packet sent back by the Master, it will perform the log truncation process described above locally.
1.AutoSwitchHaConnection (Master) will continually send log packets to the Slave as follows:
current state(4byte) + body size(4byte) + offset(8byte) + epoch(4byte) + epochStartOffset(8byte) + additionalInfo(confirmOffset) (8byte)+ body
Current state
: represents the current HAConnectionState, which is Transfer.Body size
: represents the length of the body.Offset
: the starting offset of the current batch of logs.Epoch
: represents the MasterEpoch to which the current batch of logs belongs.epochStartOffset
: represents the StartOffset of the MasterEpoch corresponding to the current batch of logs.confirmOffset
: represents the minimum offset among replicas in SyncStateSet.Body
: logs.2.AutoSwitchHaClient (Slave) will send an ACK packet to the Master:
current state(4byte) + maxOffset(8byte)
Current state
: represents the current HAConnectionState, which is Transfer.MaxOffset
: represents the current maximum log offset of the Slave.ELectMaster mainly selects a new Master from the SyncStateSet list when the Master of a Broker replica group is offline or inaccessible. This event is initiated by the Controller itself or through the electMaster
operation command.
Whether the Controller is deployed independently or embedded in Namesrv, it listens to the connection channels of each Broker. If a Broker channel becomes inactive, it checks whether the Broker is the Master, and if so, it triggers the Master election process.
The process of electing a Master is relatively simple. We just need to select one from the SyncStateSet list corresponding to the group of Brokers and make it the new Master, and apply the result to the in-memory metadata through the DLedger consensus. Finally, the result is notified to the corresponding Broker replica group.
SyncStateSet is an important basis for electing a Master. Changes to the SyncStateSet list are mainly initiated by the Master Broker. The Master completes the Shrink and Expand of SyncStateSet through a periodic task and initiates an Alter SyncStateSet request to the election component Controller during the synchronization process.
Shrink SyncStateSet refers to the removal of replicas from the SyncStateSet replica set that are significantly behind the Master, based on the following criteria:
If a Slave replica catches up with the Master, the Master needs to timely alter SyncStateSet with the Controller. The condition for adding to SyncStateSet is slaveAckOffset >= ConfirmOffset (the minimum value of MaxOffset among all replicas in the current SyncStateSet).