| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| = Transactions Design Overview |
| |
| This document describes the implementation of multi-row transactions in Kudu. An initial design |
| document was written to provide Kudu-related context and explore possible implementations. That can |
| be found |
| link:https://docs.google.com/document/d/1qv7Zejpfzg-HvF5azRL49g5lRLQ4437EmJ53GiupcWQ/edit#[here]. |
| |
| Last updated: |
| |
| * *1.15.0*: experimental, `INSERT` and `INSERT_IGNORE` ops only. To enable, set |
| `--txn_manager_enabled` on the Kudu master and `--enable_txn_system_client_init` on Kudu tablet |
| servers, each with `--unlock_experimental_flags`. By default, transactions require at least three |
| tablet servers to function to host a system table, but this can be adjusted by setting |
| `--txn_manager_status_table_num_replicas=1` on the master. |
| |
| == Terminology |
| |
| Some new terminology added with the introduction of transactions: |
| |
| *Transaction participant*: a tablet that has had ops sent to it within a transaction. “Participant” |
| refers to the logical tablet, in the same way that “tablet” refers to the logical partition of a |
| table. “Leader participant” and “follower participant” refer to the leader and follower replicas of |
| a tablet that is a participant in a transaction. A transaction may have many participants. |
| |
| *Transaction status table*: a distributed table with a logical schema of (`txn_id`, `commit_state`, |
| `start_timestamp`, `last_update_timestamp`, `commit_timestamp`, `list_of_participant_tablet_ids`). |
| This table is used to keep track of what transactions are in flight. |
| |
| *`TxnStatusManager`*: an entity that lives in-memory alongside each tablet replica of the |
| transaction status table. It is in charge of actually writing to the transaction status tablets, and |
| transitioning the in-memory transaction states through the lifecycle of each transaction. |
| |
| *`TxnManager`*: a proxy between the Kudu client and the `TxnStatusManagers` so the user client |
| doesn’t have to interact directly with the transaction status table internals (e.g. opening the |
| table, getting the table locations, etc). |
| |
| *`TxnSystemClient`*: encapsulates all the logic required for finding and sending RPCs to the |
| transaction status table, and to participants. This is used by the `TxnManager`, the transaction |
| status manager, and transaction participants. |
| |
| *Transaction handle*: a public-facing, client-side object that interacts with |
| the `TxnManager` to start, commit, abort, and heartbeat to the `TxnStatusManagers`. |
| |
| == A walkthrough of the commit path |
| |
| Before delving into the details of each component, below is a depiction of the transactional write |
| path. It is assumed that the transaction status table has the default replication factor (RF=3). |
| |
| image::design-docs_transactions.svg[Commit path] |
| |
| . A Kudu client sends a request to the `TxnManager` to begin a transaction. |
| . The `TxnManager` has cached the highest transaction ID seen by a transaction so far (though this |
| could also be heartbeated around), and sends a request to the appropriate leader `TxnStatusManager` |
| to begin a transaction with the next transaction ID. |
| . If the transaction ID is accepted by the transaction status tablet (i.e. it’s |
| of an acceptable hash and range), a new status record is inserted to it. |
| . Upon successfully persisting the new status record in the transaction status table, the |
| `TxnStatusManager` returns a response to the `TxnManager`, who now knows of a new highest |
| transaction ID. |
| . The `TxnManager` returns to the Kudu client with the transaction ID. |
| . The Kudu client sends writes directly to the tablet servers, but with a transaction ID associated |
| with every WriteRequestPB. It’s first checked whether or not the transaction participant has been |
| registered with the `TxnStatusManager`. |
| . If the transaction ID hasn’t been registered with this participant, the participant uses the |
| transaction system client to register itself with the appropriate `TxnStatusManager`. The |
| `TxnStatusManager` writes a participant record that includes the participant’s tablet ID. |
| . The transaction participant leader replicates the client write to its followers. |
| . The transaction participant returns to the Kudu client with success. |
| . When the user wants to commit the transaction, the Kudu client sends a request to the `TxnManager` |
| to commit the transaction. |
| . The `TxnManager` sends a request to the `TxnStatusManager` to commit the transaction. |
| . The `TxnStatusManager` updates in-memory state to block further participants from registering, and |
| replicates an update to the status record to indicate the transaction has begun committing. |
| . The `TxnStatusManager` returns an ack to the `TxnManager`, which is returned to the client. |
| . Asynchronously, the `TxnStatusManager` sends requests to all registered participants to begin |
| committing. |
| . Each participant replicates this request, indicating that they will no longer accept new writes |
| requests for this transaction. Each participant returns to the `TxnStatusManager` the timestamp |
| replicated for this record, which is guaranteed to be higher than all ops on this participant for |
| this transaction. |
| . The `TxnStatusManager` replicates the commit timestamp in the transaction status record. Past this |
| step, the transaction can no longer be aborted. |
| . The `TxnStatusManager` sends requests to finalize the commit on all participants. |
| . This request to finalize the commit is replicated to the replicas of the participants; upon |
| applying the commit, all state for the transaction is made visible on the participant. |
| . The `TxnStatusManager` replicates a record that the commit is complete. |
| |
| == Transactions system table and `TxnManagers` |
| |
| The `TxnManagers` are the clients’ first point of contact when operating on transactions. |
| `TxnManagers` are entities that currently live on the Kudu master nodes and serve as proxies to the |
| transaction status table, a distributed system table that stores metadata about every transaction in |
| the cluster. |
| |
| `TxnManagers` are mostly stateless and mostly serve to send requests that update the transaction |
| status table, with the exception that `TxnManagers` keep track of the highest transaction ID seen so |
| far, allowing it to make requests to start transactions with a specific transaction ID. In doing so, |
| users do not have to supply a transaction ID when starting a new transaction, and partitions of the |
| transaction status table do not need to coordinate among themselves to determine the next |
| transaction ID. |
| |
| // TODO: file ticket about improving transaction ID assignment algorithm |
| |
| The `TxnManager` service exposes the following RPC endpoints to clients: |
| |
| * `BeginTransaction() => { txn_id, keepalive_millis }`: Starts a transaction in the `OPEN` state |
| * `CommitTransaction(txn_id) => {}`: Transitions a transaction from `OPEN` to `COMMIT_IN_PROGRESS` |
| * `AbortTransaction(txn_id) => {}`: Transitions a transaction from `OPEN` or `COMMIT_IN_PROGRESS` to |
| `ABORT_IN_PROGRESS` |
| * `GetTransactionState(txn_id) => { state }`: Returns the current state of a transaction |
| * `KeepTransactionAlive(txn_id) => {}`: Signals to the `TxnStatusManager` that a transaction should |
| be kept alive |
| |
| Each endpoint corresponds to a user-facing client API in the C++ and Java clients, with the |
| exception of the keep-alive endpoint, as keep-alive heartbeating is handled automatically by |
| transaction handles. |
| |
| Under the hood, the `TxnManager` wraps a KuduClient as the `TxnSystemClient`, which it uses to |
| lazily create, open, alter the transaction status table, as well as send requests to its partitions. |
| By default, the table is created only once the first `BeginTransaction()` request is received. The |
| "lazy" initialization of the transactions status table can be controlled via the |
| `--txn_manager_lazily_initialized` master flag; when disabled, the `TxnManagers` will attempt to |
| create the table periodically upon starting up. |
| |
| The transaction status table is currently partitioned by range and starts off with a single range. |
| As new transactions are added and the partitions are filled up, new ranges are added to the table |
| automatically by the `TxnManagers`. The size of these partitions can be controlled via the |
| `--txn_manager_status_table_range_partition_span` master flag. |
| |
| == `TxnStatusManager` |
| |
| Requests to a given transaction status table partition are received and managed by the |
| `TxnStatusManager`, which keeps track of the transactions and participants stored in the tablet. |
| Tablet replicas of tables of the TXN_STATUS_TABLE type are expected to have a specific schema: |
| |
| [source,sql] |
| ---- |
| CREATE TABLE kudu_system.kudu_transactions ( |
| txn_id INT64, |
| entry_type INT8, |
| identifier STRING, |
| metadata STRING, |
| PRIMARY KEY (txn_id, entry_type, identifier) |
| ) PARTITION BY RANGE (txn_id) |
| ( |
| PARTITION 0 <= VALUES < 1000000 |
| ) |
| ---- |
| |
| This schema allows the tablet replica to initialize a `TxnStatusManager` by reading its records. It |
| maintains an in-memory map of transaction IDs to transaction metadata, allowing it to serve the |
| current state or update the state. The primary role of the `TxnStatusManager` is to manage the |
| lifecycle of a transaction, transitioning it from state to state with the following allowed state |
| transitions. |
| |
| BeginCommit FinalizeCommit CompleteCommit |
| OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS --> COMMITTED |
| |
| BeginCommit BeginAbort FinalizeAbort |
| OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS --> ABORTED |
| |
| AbortTxn FinalizeAbort |
| OPEN --> ABORT_IN_PROGRESS --> ABORTED |
| |
| The creation of an `OPEN` transaction, and the transitions to either `COMMIT_IN_PROGRESS` or |
| `ABORT_IN_PROGRESS` are initiated by users and are synchronous. All other transitions are performed |
| automatically by the `TxnStatusManager` background tasks, and their completion can be monitored by |
| clients by getting the transaction’s status. |
| |
| To update a given transaction, the `TxnStatusManager` writes the changes to disk, but does not yet |
| commit the changes to in-memory state. Once the update is replicated and persisted to the |
| transaction status tablet's replicas, the transaction state transition is committed and made visible |
| to users. |
| |
| === Leadership protection |
| |
| Much like the CatalogManager, only the leader `TxnStatusManager` is allowed to perform operations. |
| This is accomplished by taking a shared lock on in-memory Raft term that is set when a |
| `TxnStatusManager` becomes leader of a term. If a `TxnStatusManager` receives a request, it checks |
| to see if the current term is the same as the term set upon last becoming leader -- if the term does |
| not match, the request is rejected, signalling leadership has changed. |
| |
| In the event of a network partition where a leader has become stale but still thinks it is leader, |
| updates to the transactions status table are protected by the underlying tablet’s write path: the |
| attempt to write to the table will be blocked by the Raft protocol, and an error will be returned, |
| signaling leadership has changed. |
| |
| To ensure consistency of in-memory state across term changes, once a `TxnStatusManager` is elected |
| leader, it reads the contents of the tablet, regenerating the in-memory state of all transactions. |
| |
| === Background orchestration tasks |
| |
| The transitioning from `COMMIT_IN_PROGRESS` or `ABORT_IN_PROGRESS` to their corresponding terminal |
| states, as well as orchestrating the appropriate participant RPCs, is managed by a set of tasks per |
| transaction. A similar pattern is used for each transition, so only the transition from |
| `COMMIT_IN_PROGRESS` to `COMMITTED` is described below. |
| |
| * Once the `TxnStatusManager` sets a transaction to `COMMIT_IN_PROGRESS`, it kicks off an |
| asynchronous RPC to each participant to begin committing. |
| * Upon returning, the last returned RPC writes the `FINALIZE_IN_PROGRESS` record to the |
| `TxnStatusManager`. |
| * Once written, the `TxnStatusManager` kicks off an asynchronous RPC to each participant to finalize |
| the commit. |
| * Upon returning, the last returned RPC writes the `COMMITTED` record to the `TxnStatusManager`. |
| |
| Since only the leader `TxnStatusManager` is meant to be updating transaction state, in the above |
| sequence, once returning from doing IO, or from waiting, the `TxnStatusManager` checks to ensure |
| it’s still the leader. If not, it stops the task. Additionally, once a `TxnStatusManager` becomes |
| leader, as it reads the existing transaction states from disk, it begins tasks for any transaction |
| that is in a non `OPEN`, non-terminal (i.e. not `COMMITTED`, not `ABORTED`) state. |
| |
| === Heartbeating and staleness detection |
| |
| Clients send heartbeat messages to a `TxnStatusManager` in order to let it know that a transaction |
| is not abandoned. Otherwise, the leader `TxnStatusManager` automatically aborts abandoned |
| transactions. The leader `TxnStatusManager` keeps track of the last heartbeat for each transaction |
| in-memory only, making heartbeating a relatively lightweight operation. |
| |
| Each tablet server has a background thread that periodically goes through each `TxnStatusManager` |
| hosted on the server, and aborts all transactions that have not been heartbeat to within a |
| configured interval. Only transactions in the `OPEN` state are automatically aborted. |
| |
| == Transaction participants |
| |
| === Transaction state machine |
| |
| Transaction participants keep track of local transaction state, ensuring that transactional writes |
| are only accepted if the transaction has not yet been committed or aborted. To do this, participants |
| persistently (i.e. via Raft replication) keep track of the transaction state, described below: |
| |
| * `kInitializing`: the transaction has yet to be fully initialized. Replication may be in progress, |
| but we we cannot consider the transaction as fully open yet. |
| * `kOpen`: the transaction is available for writes. |
| * `kCommitInProgress`: the `TxnStatusManager` has signaled to the participant that the transaction |
| should begin committing. The transaction can no longer accept new writes. |
| * `kCommitted`: the `TxnStatusManager` has finalized the commit. Transactional |
| rows should be visible to clients. |
| * `kAborted`: the `TxnStatusManager` has begun aborting the transaction. |
| |
| The following state transitions are expected: |
| |
| BEGIN_TXN BEGIN_COMMIT FINALIZE_COMMIT |
| kInitializing --> kOpen --> kCommitInProgress --> kCommitted |
| |
| BEGIN_TXN ABORT_TXN |
| kInitializing --> kOpen --> kAborted |
| |
| As orchestration RPCs are replicated on the participant, these transitions are what are applied to |
| the underlying tablet, updating an in-memory registry of `TxnMetadata` objects per ID, and |
| persisting the states with tablet metadata. |
| |
| // TODO: point to the change that suggests removing metadata once all in-memory stores are flushed. |
| |
| === Registering participants |
| |
| Before a participant is able to participate in a transaction, it must register itself with the |
| `TxnStatusManager`, and replicate an op that demarcates the beginning of the transaction on all |
| replicas of the participant. |
| |
| To ensure this happens, when a transactional write is first received by a leader participant, it |
| first checks to see if it has already completed these preparatory steps. It does so by keeping an |
| in-memory map of “dispatchers”, one per active transaction. A participant’s dispatcher caches |
| whether or not the participant has completed the steps, and if so, simply lets the write through to |
| the prepare pool. |
| |
| Otherwise it keeps the write request in a queue temporarily, using the `TxnSystemClient` to |
| asynchronously register itself as a participant with the `TxnStatusManager`, and then replicate the |
| op to begin the transaction. Once the request is complete, the queued write requests are submitted |
| to the prepare threadpool. If either step returns an error, the error is passed back to the writer, |
| ensuring that retriable errors (e.g. leadership changes) lead to the transactional write op being |
| retried, and non-retriable errors (e.g. invalid transaction ID) are shown to users. |
| |
| Once the transaction has begun committing on the participant, or begun aborting, the transaction’s |
| dispatcher is unregistered. Further attempts to write to the transaction may instantiate a new |
| dispatcher, but the attempt to register the participant will fail, given the registration will be |
| rejected by the `TxnStatusManager`. |
| |
| === Participant commit and MVCC |
| |
| The “commit” condition is different for transactions than regular write ops, which only need to be |
| considered “applied” to be visible to users. The goals for commit are: |
| |
| * Stop accepting writes once a `BEGIN_COMMIT` op has been replicated on a participant. |
| * Only show results that have been committed, as indicated by the replication of a `FINALIZE_COMMIT` |
| op on a participant, which contains a commit timestamp selected by the `TxnStatusManager` after |
| receiving all `BEGIN_COMMIT` op responses. |
| |
| To accomplish this, when finishing the replication of a `BEGIN_COMMIT` op, rather than demarcating |
| the associated MVCC op as completed, the Txn keeps track of the `BEGIN_COMMIT` MVCC op timestamp, |
| allowing a `FINALIZE_COMMIT` op to complete replicating, and then mark the `BEGIN_COMMIT` MVCC op as |
| applied. |
| |
| The commit timestamp is selected by the `TxnStatusManager` to be the highest of all `BEGIN_COMMIT` |
| op timestamps, so since the commit timestamp of a transaction will be greater than all |
| `BEGIN_COMMIT` timestamps (it is selected as the highest of all `BEGIN_COMMIT` timestamps) of all |
| participants, if serving a scan at time t, it is sufficient to wait until all ops before t are |
| applied. |
| |
| When reading rows, the commit condition changes depending on the kind of snapshot we are using: |
| |
| * *Timestamp (as in `READ_AT_SNAPSHOT`, `READ_YOUR_WRITES`, diff scan)*: The transaction is |
| considered committed if the `TxnMetadata` has both a `BEGIN_COMMIT` op that has been applied and a |
| commit timestamp that falls within the range of the snapshot. |
| * *Latest (as in `READ_LATEST`)*: The transaction is considered committed if the `TxnMetadata` has a |
| `BEGIN_COMMIT` op that has been applied, since we only apply the `BEGIN_COMMIT` op after |
| replicating the `FINALIZE_COMMIT` op. |
| |
| === Transactional MemRowSets |
| |
| In addition to the single MRS that tablets traditionally have, tablets now maintain a map of |
| uncommitted transaction ID to dirty MRS, and a set of committed MRSs that were inserted as a part of |
| transactions. Each such MRS has a shared reference to a `TxnMetadata` instance that is maintained as |
| a part of `TabletMetadata`. |
| |
| When a transaction is begun on a participant, a transactional MRS is created for it. Transactional |
| write ops first check all DRSs for row presence, then the main, non-transactional MRS, then finally |
| attempt to insert to the transactional MRS. As `FINALIZE_COMMIT` ops are applied, uncommitted MRSs |
| are moved to the set of committed MRSs. |
| |
| When scanning through a transactional MRS, when evaluating whether a base insert is relevant to a |
| given scan, Kudu checks to see if the MRS’s `TxnMetadata` should be considered committed with the |
| given MVCC snapshot. Updates to the base inserts are evaluated as normal, wherein the visibility |
| condition is based on whether the updates have been applied (updates are not yet supported). |
| |
| Transactional MemRowSets are not flushed to disk until they are fully committed, at which point the |
| memory usage of all committed MRSs are lumped together when considering whether to flush. When |
| flushing, all committed MRSs are taken to be the flush input, similar to a merge compaction, and |
| DRSs are written as though we were compacting multiple MRSs. When flushed, on-disk timestamps are |
| written as normal, using the rows’ commit timestamps, and circumventing the need to consult |
| transaction metadata to evaluate upon further scans. |
| |
| === Locking and deadlock protection |
| |
| Currently, per-transaction partition-level locking is supported, in tandem with per-op row-level |
| locking. Each tablet may participate in at most one transaction at a time. |
| |
| To avoid deadlocks, if a transaction were to attempt to acquire a partition lock that is held by a |
| transaction with a lower transaction ID, the later transaction aborts itself -- the tablet server |
| sends a best-effort request to the `TxnStatusManager` to abort the transaction, i.e. the caller |
| transaction “dies”. If the later transaction had a lower transaction ID, the op should be retried, |
| and the caller transaction “waits”. |
| |
| To ensure rows are properly locked in the presence of both transactional and non-transactional |
| workloads, all non-transactional write ops also take the partition lock with the maximum transaction |
| ID. This means that all transactional write ops will wait for non-transactional writes to finish, |
| and non-transactional writes will abort in the presence of a multi-row transaction inserting rows |
| into the same tablet. |
| |
| == Transactions client API |
| |
| Both the C++ and Java client leverage the existing session-based API that users have come to know. |
| With transactions, however, there is also the concept of transaction handles, which serve as the |
| initiators of transactions, the vehicle with which to create transactional sessions, the object |
| with which to orchestrate the commit or rollback of a transaction, and a means to signal activity on |
| a transaction via automatic heartbeating to prevent the `TxnStatusManager` from culling stale |
| transactions. |
| |
| [source,c++] |
| ---- |
| shared_ptr<KuduTransaction> txn; |
| KUDU_RETURN_NOT_OK(client->NewTransaction(&txn)); |
| shared_ptr<KuduSession> session; |
| KUDU_RETURN_NOT_OK(txn->CreateSession(&session)); |
| // ... insert rows to 'session' |
| KUDU_RETURN_NOT_OK(session->Flush()); |
| KUDU_RETURN_NOT_OK(txn->Commit()); |
| ---- |
| |
| [source,java] |
| ---- |
| KuduTransaction txn = client.newTransaction(); |
| KuduSession session = txn.newKuduSession(); |
| // ... insert rows to 'session' |
| session.flush(); |
| txn.commit(); |
| ---- |
| |
| === Heartbeating |
| |
| Under the hood, each newly-created transaction handle, while kept in scope, also heartbeats through |
| to the `TxnStatusManagers` to signal activity for the transaction, preventing it from being culled |
| by the transaction staleness checkers. Client applications do not need to explicitly heartbeat. |
| |
| === Serialization |
| |
| Given we expect there to be distributed actors that participate in a given transaction, Kudu also |
| exposes a way to transmit transaction handles across processes. Rather than exposing internal |
| details like the transaction ID, we serialize and deserialize a `TxnTokenPB` that contains metadata |
| about the transaction. |
| |
| [source,c++] |
| ---- |
| string txn_token; |
| shared_ptr<KuduTransaction> txn; |
| KUDU_RETURN_NOT_OK(client_->NewTransaction(&txn)); |
| KUDU_RETURN_NOT_OK(txn->Serialize(&txn_token)); |
| |
| shared_ptr<KuduTransaction> same_txn; |
| KuduTransaction::Deserialize(client, txn_token, &same_txn) |
| ---- |
| |
| [source,java] |
| ---- |
| KuduTransaction txn = client.newTransaction(); |
| byte[] txn_token = txn.serialize(); |
| KuduTransaction sameTxn = KuduTransaction.deserialize(txn_token, asyncClient) |
| ---- |
| |
| Since we typically expect there to be a single driver of a transaction and multiple actors as |
| participants of the transaction, by default, deserialized transaction handles do not heartbeat. |
| The expectation is that the drivers will continue heartbeating until the transaction is complete. |
| This can be toggled by passing customized `SerializationOptions`. |
| |