This document outlines the proposed recovery protocol for containers where one or more replicas are not cleanly closed or have potential data inconsistencies. It aims to provide an overview of the planned changes and their implications, focusing on the overall flow and key design decisions.
This proposal is motivated by the need to reconcile mismatched container replica states and contents among container replicas. This covers
Ideally, a healthy Ozone cluster would contain only open and closed container replicas. However, container replicas commonly end up with a mix of states including quasi-closed and unhealthy that the current system is not able to resolve to cleanly closed replicas. The cause of these states is often bugs or broad failure handling on the write path. While we should fix these causes, they raise the problem that Ozone is not able to reconcile these mismatched container replica states on its own, regardless of their cause. This has lead to significant complexity in the replication manager for how to handle cases where only quasi-closed and unhealthy replicas are available, especially in the case of decommissioning.
Even when all container replicas are closed, the system assumes that these closed container replicas are equal with no way to verify this. During writes a client provides a checksum for the chunk that is written. The scanner validates periodically that the checksums of the chunks on disk match the checksums provided by the client. It is possible that the checksum of a chunk on disk does not match the client provided checksum recorded at the time of write. Additionally, during container replica copying, the consistency of the data is not validated, opening the possibility of silent data corruption propagating through the system.
This document proposes a container reconciliation protocol to solve these problems. After implementing the proposal:
Note: This document does not cover the case where the checksums recorded at the time of write match the chunks locally within a Datanode but differ across replicas. We assume that the replication code path is correct and that the checksums are correct. If this is not the case, the system is already in a failed state and the reconciliation protocol will not be able to recover it. Chunks once written are not updated, thus this scenario is not expected to occur.
User Focus: Users prioritize data durability and availability above all else.
Focus on Recovery Paths: Focusing on the path to a failed state is secondary to focusing on the path out of failed states.
System Safety: If a decision made by software will make data more durable a single trigger is sufficient. If a decision can potentially reduce durability of data or execute an unsafe operation (unlink, trim, delete) then the confidence level has to be high, the clarity of the decision precise and clear and preferably the decision is made within services that have a wider view of the cluster (SCM/Recon).
Datanode Simplicity: Datanodes should only be responsible for safe decisions and eager to make safe choices, avoiding unsafe autonomy.
The proposed solution involves defining a container level checksum that can be used to quickly tell if two containers replicas match or not based on their data. This container checksum can be defined as a three level Merkle tree:
When SCM sees that replicas of a non-open container have diverged container checksums, it can trigger a reconciliation process on all datanode replicas. SCM does not need to know which container hash is correct (if any of them are correct), only that all containers match. Datanodes will use their merkle tree and those of the other replicas to identify issues with their container. Next, datanodes can read the missing data from existing replicas and use it to repair their container replica.
Since the container hash is generated leveraging the checksum recorded at the time of writing, the container hash represents consistency of the data from a client perspective.
Add container level checksums that datanodes can compute and store.
Add a mechanism for datanodes to reconcile their replica of a container with another datanode's replica so that both replicas can be verified to be equal at the end of the process.
Add a mechanism for SCM to trigger this reconciliation as part of the existing heartbeat command protocol SCM uses to communicate with datanodes.
An ozone admin container reconcile <container-id>
CLI that can be used to manually resolve diverged container states among non-open container replicas.
ozone admin container info
output.Delete blocks that a Container Replica has not yet deleted.
Automate container reconciliation requests as part of SCM's replication manager.
Simplify SCM replication manager decommission and recovery logic based on mismatch of container checksums, instead of the combination of all possible container states.
The only extra information we will need to store is the container merkle tree on each datanode container replica. The current proposal is store this separately as a proto file on disk so that it can be copied over the network exactly as stored. The structure would look something like this (not finalized, for illustrative purposes only):
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 718e2a108c7..d8d508af356 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -382,6 +382,7 @@ message ChunkInfo { repeated KeyValue metadata = 4; required ChecksumData checksumData =5; optional bytes stripeChecksum = 6; + optional bool healthy = 7; // If all the chunks on disk match the expected checksums provided by the client during write } message ChunkInfoList { @@ -525,3 +526,38 @@ service IntraDatanodeProtocolService { rpc download (CopyContainerRequestProto) returns (stream CopyContainerResponseProto); rpc upload (stream SendContainerRequest) returns (SendContainerResponse); } + +/* +BlockMerkle tree stores the checksums of the chunks in a block. +The Block checksum is derived from the checksums of the chunks in case of replicated blocks and derived from the +metadata of the chunks in case of erasure coding. +Two Blocks across container instances on two nodes have the same checksum if they have the same set of chunks. +A Block upon deletion will be marked as deleted but will preserve the rest of the metadata. +*/ +message BlockMerkleTree { + optional BlockData blockData = 1; // The chunks in this should be sorted by the order of chunks written. + optional ChecksumData checksumData = 2; // Checksum of the checksums of the chunks. + optional bool deleted = 3; // If the block is deleted. + optional int64 length = 4; // Length of the block. + optional int64 chunkCount = 5; // Number of chunks in the block. +} + +/* +ContainerMerkleTree stores the checksums of the blocks in a container. +The Container checksum is derived from the checksums of the blocks. +Two containers across container instances on two nodes have the same checksum if they have the same set of blocks. +If a block is deleted within the container, the checksum of the container will remain unchanged. + */ +message ContainerMerkleTree { + enum FailureCause { + NO_HEALTHY_CHUNK_FOUND_WITH_PEERS = 1; // No healthy chunk found with peers. + NO_PEER_FOUND = 2; // No peer found. + } + optional int64 containerID = 1; // The container ID. + repeated BlockMerkleTree blockMerkleTrees = 2; // The blocks in this should be sorted by the order of blocks written. + optional ChecksumData checksumData = 3; // Checksum of the checksums of the blocks. + optional int64 length = 5; // Length of the container. + optional int64 blockCount = 6; // Number of blocks in the container. + optional FailureCause failureCause = 7; // The cause of the failure. + optional int64 reconciliationCount = 8; // The reconciliation count. +}
This is written as a file to avoid bloating the RocksDB instance which is in the IO path.
The following APIs would be added to datanodes to support container reconciliation. The actions performed when calling them is defined in Events.
reconcileContainer(containerID, List<Replica>)
getContainerHashes
for each container replica to identify repairs needed, and use existing chunk read/write APIs to do repairs necessary.getContainerHashes(containerID)
SCM: Storage Container Manager
DN: Datanode
DN 1
for container 12 with replicas on DN 2
and DN 3
.SCM -> reconcileContainer(Container #12, DN2, DN3) -> DN 1
DN 1
schedules the Merkle Tree to be calculated if it is not already present and report it to SCM.DN 1
already has the Merkle Tree locally, it will compare it with the Merkle Trees of the other container replicas and schedule reconciliation if they are different. Example:DN 1 -> getContainerHashes(Container #12) -> DN 2
// Datanode 1 gets the merkle tree of container 12 from Datanode 2.DN 1 -> getContainerHashes(Container #12) -> DN 3
// Datanode 1 gets the merkle tree of container 12 from Datanode 3.DN 1
checks if any blocks are missing. For each missing Block:DN 1 -> readChunk(Container #12, Block #3, Chunk #1) -> DN 2
// Read the chunk from DN 2DN 1 -> readChunk(Container #12, Block #3, Chunk #2) -> DN 3
// Read the chunk from DN 3DN 1
checks if any blocks are corrupted. For each corrupted Block:DN 1 -> readChunk(Container #12, Block #20, Chunk #13) -> DN 2
DN 1 -> readChunk(Container #12, Block #20, Chunk #21) -> DN 3
DN 1
deletes any blocks that are marked as deleted.DN 1
recomputes Merkle tree and sends it to SCM via ICR (Incremental Container Report).Note: This document does not cover the case where the checksums recorded at the time of write match the chunks locally within a Datanode but differ across replicas. We assume that the replication code path is correct and that the checksums are correct. If this is not the case, the system is already in a failed state and the reconciliation protocol will not be able to recover it. Chunks once written are not updated, thus this scenario is not expected to occur.
getContainerHashes
When a datanode receives a request to get container hashes. The following steps are performed:
getContainerHashes
will return and error.Container is missing blocks
DN 1
has 10 blocks, DN 2
has 11 blocks, DN 3
has 12 blocks.DN 1
will read the missing block from DN 2
and DN 3
and store it locally.DN 1
will recompute the merkle tree and send it to SCM.Container has corrupted chunks
DN 1
has block 20: chunk 13, DN 2
has block 12: chunk 13DN 1
will read the corrupted block from DN 2
and store it locally.DN 1
will recompute the merkle tree and send it to SCM.Closed container has a chunk that is corrupted
A Datanode is still accepting writes and reads while the reconciliation process is ongoing. The reconciliation process is a background process that does not affect the normal operation of the Datanode. This section defines how container reconciliation will function as events occur in the system.
These events occur as part of container reconciliation in the happy path case.
In multiple scenarios it is safer to immediately create a container replica and not wait for reconciliation. Example: The cluster has only one single closed container replica for the container.
These may happen to the container during reconciliation and must remain consistent during reconciliation. Note that we cannot depend on SCM coordination to shelter datanodes from other commands after SCM has instructed them to reconcile because datanode command queues may get backed up at various points and process commands in a different order, or long after they were initially issued.
This read could be an end client or another datanode asking for data to repair its replica. The result will only be inconsistent if the chunk/block is being repaired. In this case the data is already corrupt and the client will get a read checksum failure either way. Even if all datanodes are reconciling the same container at the same time, this should not cause an availability problem because there has to be one correct replica that others are repairing from that the client can also read.
There may be cases where a container is critically under-replicated and we need a good copy of the data as soon as possible. Meanwhile, reconciliation could remain blocked for an unrelated reason. We can provide a way to pause/unpause reconciliation between chunk writes to do reasonably consistent container exports.
When SCM sends block delete commands to datanodes, update the merkle tree when the datanode block deleting service runs and processes that block.
Merkle tree update should be done before RocksDB update to make sure it is persisted in case of a failure during delete.
Merkle tree update should not be done when datanodes first receives the block delete command from SCM, because this command only adds the delete block proto to the container. It does not iterate the blocks to be deleted so we should not add that additional step here.
Since the scanner can generate the container merkle trees in the background, existing containers created before this feature will still be eligible for reconciliation. These old containers may not have all of their block deletes present in the merkle tree, however, which could cause some false positives about missing blocks on upgrade if one node had already deleted blocks from a container before the upgrade, and another datanode has not yet processed the delete of those blocks.
This can be mitigated by:
The sequence of events would look like this. Assume software v1 does not have container reconciliation, but v2 does.