blob: 43706a6d3d0e60e6cf9a36f39c28b72efc71f480 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
= Transactions Design Overview
This document describes the implementation of multi-row transactions in Kudu. An initial design
document was written to provide Kudu-related context and explore possible implementations. That can
be found
link:https://docs.google.com/document/d/1qv7Zejpfzg-HvF5azRL49g5lRLQ4437EmJ53GiupcWQ/edit#[here].
Last updated:
* *1.15.0*: experimental, `INSERT` and `INSERT_IGNORE` ops only. To enable, set
`--txn_manager_enabled` on the Kudu master and `--enable_txn_system_client_init` on Kudu tablet
servers, each with `--unlock_experimental_flags`. By default, transactions require at least three
tablet servers to function to host a system table, but this can be adjusted by setting
`--txn_manager_status_table_num_replicas=1` on the master.
== Terminology
Some new terminology added with the introduction of transactions:
*Transaction participant*: a tablet that has had ops sent to it within a transaction. “Participant”
refers to the logical tablet, in the same way that “tablet” refers to the logical partition of a
table. “Leader participant” and “follower participant” refer to the leader and follower replicas of
a tablet that is a participant in a transaction. A transaction may have many participants.
*Transaction status table*: a distributed table with a logical schema of (`txn_id`, `commit_state`,
`start_timestamp`, `last_update_timestamp`, `commit_timestamp`, `list_of_participant_tablet_ids`).
This table is used to keep track of what transactions are in flight.
*`TxnStatusManager`*: an entity that lives in-memory alongside each tablet replica of the
transaction status table. It is in charge of actually writing to the transaction status tablets, and
transitioning the in-memory transaction states through the lifecycle of each transaction.
*`TxnManager`*: a proxy between the Kudu client and the `TxnStatusManagers` so the user client
doesn’t have to interact directly with the transaction status table internals (e.g. opening the
table, getting the table locations, etc).
*`TxnSystemClient`*: encapsulates all the logic required for finding and sending RPCs to the
transaction status table, and to participants. This is used by the `TxnManager`, the transaction
status manager, and transaction participants.
*Transaction handle*: a public-facing, client-side object that interacts with
the `TxnManager` to start, commit, abort, and heartbeat to the `TxnStatusManagers`.
== A walkthrough of the commit path
Before delving into the details of each component, below is a depiction of the transactional write
path. It is assumed that the transaction status table has the default replication factor (RF=3).
image::../images/design-docs_transactions.png[Commit path]
. A Kudu client sends a request to the `TxnManager` to begin a transaction.
. The `TxnManager` has cached the highest transaction ID seen by a transaction so far (though this
could also be heartbeated around), and sends a request to the appropriate leader `TxnStatusManager`
to begin a transaction with the next transaction ID.
. If the transaction ID is accepted by the transaction status tablet (i.e. it’s
of an acceptable hash and range), a new status record is inserted to it.
. Upon successfully persisting the new status record in the transaction status table, the
`TxnStatusManager` returns a response to the `TxnManager`, who now knows of a new highest
transaction ID.
. The `TxnManager` returns to the Kudu client with the transaction ID.
. The Kudu client sends writes directly to the tablet servers, but with a transaction ID associated
with every WriteRequestPB. It’s first checked whether or not the transaction participant has been
registered with the `TxnStatusManager`.
. If the transaction ID hasn’t been registered with this participant, the participant uses the
transaction system client to register itself with the appropriate `TxnStatusManager`. The
`TxnStatusManager` writes a participant record that includes the participant’s tablet ID.
. The transaction participant leader replicates the client write to its followers.
. The transaction participant returns to the Kudu client with success.
. When the user wants to commit the transaction, the Kudu client sends a request to the `TxnManager`
to commit the transaction.
. The `TxnManager` sends a request to the `TxnStatusManager` to commit the transaction.
. The `TxnStatusManager` updates in-memory state to block further participants from registering, and
replicates an update to the status record to indicate the transaction has begun committing.
. The `TxnStatusManager` returns an ack to the `TxnManager`, which is returned to the client.
. Asynchronously, the `TxnStatusManager` sends requests to all registered participants to begin
committing.
. Each participant replicates this request, indicating that they will no longer accept new writes
requests for this transaction. Each participant returns to the `TxnStatusManager` the timestamp
replicated for this record, which is guaranteed to be higher than all ops on this participant for
this transaction.
. The `TxnStatusManager` replicates the commit timestamp in the transaction status record. Past this
step, the transaction can no longer be aborted.
. The `TxnStatusManager` sends requests to finalize the commit on all participants.
. This request to finalize the commit is replicated to the replicas of the participants; upon
applying the commit, all state for the transaction is made visible on the participant.
. The `TxnStatusManager` replicates a record that the commit is complete.
== Transactions system table and `TxnManagers`
The `TxnManagers` are the clients’ first point of contact when operating on transactions.
`TxnManagers` are entities that currently live on the Kudu master nodes and serve as proxies to the
transaction status table, a distributed system table that stores metadata about every transaction in
the cluster.
`TxnManagers` are mostly stateless and mostly serve to send requests that update the transaction
status table, with the exception that `TxnManagers` keep track of the highest transaction ID seen so
far, allowing it to make requests to start transactions with a specific transaction ID. In doing so,
users do not have to supply a transaction ID when starting a new transaction, and partitions of the
transaction status table do not need to coordinate among themselves to determine the next
transaction ID.
// TODO: file ticket about improving transaction ID assignment algorithm
The `TxnManager` service exposes the following RPC endpoints to clients:
* `BeginTransaction() => { txn_id, keepalive_millis }`: Starts a transaction in the `OPEN` state
* `CommitTransaction(txn_id) => {}`: Transitions a transaction from `OPEN` to `COMMIT_IN_PROGRESS`
* `AbortTransaction(txn_id) => {}`: Transitions a transaction from `OPEN` or `COMMIT_IN_PROGRESS` to
`ABORT_IN_PROGRESS`
* `GetTransactionState(txn_id) => { state }`: Returns the current state of a transaction
* `KeepTransactionAlive(txn_id) => {}`: Signals to the `TxnStatusManager` that a transaction should
be kept alive
Each endpoint corresponds to a user-facing client API in the C++ and Java clients, with the
exception of the keep-alive endpoint, as keep-alive heartbeating is handled automatically by
transaction handles.
Under the hood, the `TxnManager` wraps a KuduClient as the `TxnSystemClient`, which it uses to
lazily create, open, alter the transaction status table, as well as send requests to its partitions.
By default, the table is created only once the first `BeginTransaction()` request is received. The
"lazy" initialization of the transactions status table can be controlled via the
`--txn_manager_lazily_initialized` master flag; when disabled, the `TxnManagers` will attempt to
create the table periodically upon starting up.
The transaction status table is currently partitioned by range and starts off with a single range.
As new transactions are added and the partitions are filled up, new ranges are added to the table
automatically by the `TxnManagers`. The size of these partitions can be controlled via the
`--txn_manager_status_table_range_partition_span` master flag.
== `TxnStatusManager`
Requests to a given transaction status table partition are received and managed by the
`TxnStatusManager`, which keeps track of the transactions and participants stored in the tablet.
Tablet replicas of tables of the TXN_STATUS_TABLE type are expected to have a specific schema:
[source,sql]
----
CREATE TABLE kudu_system.kudu_transactions (
txn_id INT64,
entry_type INT8,
identifier STRING,
metadata STRING,
PRIMARY KEY (txn_id, entry_type, identifier)
) PARTITION BY RANGE (txn_id)
(
PARTITION 0 <= VALUES < 1000000
)
----
This schema allows the tablet replica to initialize a `TxnStatusManager` by reading its records. It
maintains an in-memory map of transaction IDs to transaction metadata, allowing it to serve the
current state or update the state. The primary role of the `TxnStatusManager` is to manage the
lifecycle of a transaction, transitioning it from state to state with the following allowed state
transitions.
BeginCommit FinalizeCommit CompleteCommit
OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS --> COMMITTED
BeginCommit BeginAbort FinalizeAbort
OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS --> ABORTED
AbortTxn FinalizeAbort
OPEN --> ABORT_IN_PROGRESS --> ABORTED
The creation of an `OPEN` transaction, and the transitions to either `COMMIT_IN_PROGRESS` or
`ABORT_IN_PROGRESS` are initiated by users and are synchronous. All other transitions are performed
automatically by the `TxnStatusManager` background tasks, and their completion can be monitored by
clients by getting the transaction’s status.
To update a given transaction, the `TxnStatusManager` writes the changes to disk, but does not yet
commit the changes to in-memory state. Once the update is replicated and persisted to the
transaction status tablet's replicas, the transaction state transition is committed and made visible
to users.
=== Leadership protection
Much like the CatalogManager, only the leader `TxnStatusManager` is allowed to perform operations.
This is accomplished by taking a shared lock on in-memory Raft term that is set when a
`TxnStatusManager` becomes leader of a term. If a `TxnStatusManager` receives a request, it checks
to see if the current term is the same as the term set upon last becoming leader -- if the term does
not match, the request is rejected, signalling leadership has changed.
In the event of a network partition where a leader has become stale but still thinks it is leader,
updates to the transactions status table are protected by the underlying tablet’s write path: the
attempt to write to the table will be blocked by the Raft protocol, and an error will be returned,
signaling leadership has changed.
To ensure consistency of in-memory state across term changes, once a `TxnStatusManager` is elected
leader, it reads the contents of the tablet, regenerating the in-memory state of all transactions.
=== Background orchestration tasks
The transitioning from `COMMIT_IN_PROGRESS` or `ABORT_IN_PROGRESS` to their corresponding terminal
states, as well as orchestrating the appropriate participant RPCs, is managed by a set of tasks per
transaction. A similar pattern is used for each transition, so only the transition from
`COMMIT_IN_PROGRESS` to `COMMITTED` is described below.
* Once the `TxnStatusManager` sets a transaction to `COMMIT_IN_PROGRESS`, it kicks off an
asynchronous RPC to each participant to begin committing.
* Upon returning, the last returned RPC writes the `FINALIZE_IN_PROGRESS` record to the
`TxnStatusManager`.
* Once written, the `TxnStatusManager` kicks off an asynchronous RPC to each participant to finalize
the commit.
* Upon returning, the last returned RPC writes the `COMMITTED` record to the `TxnStatusManager`.
Since only the leader `TxnStatusManager` is meant to be updating transaction state, in the above
sequence, once returning from doing IO, or from waiting, the `TxnStatusManager` checks to ensure
it’s still the leader. If not, it stops the task. Additionally, once a `TxnStatusManager` becomes
leader, as it reads the existing transaction states from disk, it begins tasks for any transaction
that is in a non `OPEN`, non-terminal (i.e. not `COMMITTED`, not `ABORTED`) state.
=== Heartbeating and staleness detection
Clients send heartbeat messages to a `TxnStatusManager` in order to let it know that a transaction
is not abandoned. Otherwise, the leader `TxnStatusManager` automatically aborts abandoned
transactions. The leader `TxnStatusManager` keeps track of the last heartbeat for each transaction
in-memory only, making heartbeating a relatively lightweight operation.
Each tablet server has a background thread that periodically goes through each `TxnStatusManager`
hosted on the server, and aborts all transactions that have not been heartbeat to within a
configured interval. Only transactions in the `OPEN` state are automatically aborted.
== Transaction participants
=== Transaction state machine
Transaction participants keep track of local transaction state, ensuring that transactional writes
are only accepted if the transaction has not yet been committed or aborted. To do this, participants
persistently (i.e. via Raft replication) keep track of the transaction state, described below:
* `kInitializing`: the transaction has yet to be fully initialized. Replication may be in progress,
but we we cannot consider the transaction as fully open yet.
* `kOpen`: the transaction is available for writes.
* `kCommitInProgress`: the `TxnStatusManager` has signaled to the participant that the transaction
should begin committing. The transaction can no longer accept new writes.
* `kCommitted`: the `TxnStatusManager` has finalized the commit. Transactional
rows should be visible to clients.
* `kAborted`: the `TxnStatusManager` has begun aborting the transaction.
The following state transitions are expected:
BEGIN_TXN BEGIN_COMMIT FINALIZE_COMMIT
kInitializing --> kOpen --> kCommitInProgress --> kCommitted
BEGIN_TXN ABORT_TXN
kInitializing --> kOpen --> kAborted
As orchestration RPCs are replicated on the participant, these transitions are what are applied to
the underlying tablet, updating an in-memory registry of `TxnMetadata` objects per ID, and
persisting the states with tablet metadata.
// TODO: point to the change that suggests removing metadata once all in-memory stores are flushed.
=== Registering participants
Before a participant is able to participate in a transaction, it must register itself with the
`TxnStatusManager`, and replicate an op that demarcates the beginning of the transaction on all
replicas of the participant.
To ensure this happens, when a transactional write is first received by a leader participant, it
first checks to see if it has already completed these preparatory steps. It does so by keeping an
in-memory map of “dispatchers”, one per active transaction. A participant’s dispatcher caches
whether or not the participant has completed the steps, and if so, simply lets the write through to
the prepare pool.
Otherwise it keeps the write request in a queue temporarily, using the `TxnSystemClient` to
asynchronously register itself as a participant with the `TxnStatusManager`, and then replicate the
op to begin the transaction. Once the request is complete, the queued write requests are submitted
to the prepare threadpool. If either step returns an error, the error is passed back to the writer,
ensuring that retriable errors (e.g. leadership changes) lead to the transactional write op being
retried, and non-retriable errors (e.g. invalid transaction ID) are shown to users.
Once the transaction has begun committing on the participant, or begun aborting, the transaction’s
dispatcher is unregistered. Further attempts to write to the transaction may instantiate a new
dispatcher, but the attempt to register the participant will fail, given the registration will be
rejected by the `TxnStatusManager`.
=== Participant commit and MVCC
The “commit” condition is different for transactions than regular write ops, which only need to be
considered “applied” to be visible to users. The goals for commit are:
* Stop accepting writes once a `BEGIN_COMMIT` op has been replicated on a participant.
* Only show results that have been committed, as indicated by the replication of a `FINALIZE_COMMIT`
op on a participant, which contains a commit timestamp selected by the `TxnStatusManager` after
receiving all `BEGIN_COMMIT` op responses.
To accomplish this, when finishing the replication of a `BEGIN_COMMIT` op, rather than demarcating
the associated MVCC op as completed, the Txn keeps track of the `BEGIN_COMMIT` MVCC op timestamp,
allowing a `FINALIZE_COMMIT` op to complete replicating, and then mark the `BEGIN_COMMIT` MVCC op as
applied.
The commit timestamp is selected by the `TxnStatusManager` to be the highest of all `BEGIN_COMMIT`
op timestamps, so since the commit timestamp of a transaction will be greater than all
`BEGIN_COMMIT` timestamps (it is selected as the highest of all `BEGIN_COMMIT` timestamps) of all
participants, if serving a scan at time t, it is sufficient to wait until all ops before t are
applied.
When reading rows, the commit condition changes depending on the kind of snapshot we are using:
* *Timestamp (as in `READ_AT_SNAPSHOT`, `READ_YOUR_WRITES`, diff scan)*: The transaction is
considered committed if the `TxnMetadata` has both a `BEGIN_COMMIT` op that has been applied and a
commit timestamp that falls within the range of the snapshot.
* *Latest (as in `READ_LATEST`)*: The transaction is considered committed if the `TxnMetadata` has a
`BEGIN_COMMIT` op that has been applied, since we only apply the `BEGIN_COMMIT` op after
replicating the `FINALIZE_COMMIT` op.
=== Transactional MemRowSets
In addition to the single MRS that tablets traditionally have, tablets now maintain a map of
uncommitted transaction ID to dirty MRS, and a set of committed MRSs that were inserted as a part of
transactions. Each such MRS has a shared reference to a `TxnMetadata` instance that is maintained as
a part of `TabletMetadata`.
When a transaction is begun on a participant, a transactional MRS is created for it. Transactional
write ops first check all DRSs for row presence, then the main, non-transactional MRS, then finally
attempt to insert to the transactional MRS. As `FINALIZE_COMMIT` ops are applied, uncommitted MRSs
are moved to the set of committed MRSs.
When scanning through a transactional MRS, when evaluating whether a base insert is relevant to a
given scan, Kudu checks to see if the MRS’s `TxnMetadata` should be considered committed with the
given MVCC snapshot. Updates to the base inserts are evaluated as normal, wherein the visibility
condition is based on whether the updates have been applied (updates are not yet supported).
Transactional MemRowSets are not flushed to disk until they are fully committed, at which point the
memory usage of all committed MRSs are lumped together when considering whether to flush. When
flushing, all committed MRSs are taken to be the flush input, similar to a merge compaction, and
DRSs are written as though we were compacting multiple MRSs. When flushed, on-disk timestamps are
written as normal, using the rows’ commit timestamps, and circumventing the need to consult
transaction metadata to evaluate upon further scans.
=== Locking and deadlock protection
Currently, per-transaction partition-level locking is supported, in tandem with per-op row-level
locking. Each tablet may participate in at most one transaction at a time.
To avoid deadlocks, if a transaction were to attempt to acquire a partition lock that is held by a
transaction with a lower transaction ID, the later transaction aborts itself -- the tablet server
sends a best-effort request to the `TxnStatusManager` to abort the transaction, i.e. the caller
transaction “dies”. If the later transaction had a lower transaction ID, the op should be retried,
and the caller transaction “waits”.
To ensure rows are properly locked in the presence of both transactional and non-transactional
workloads, all non-transactional write ops also take the partition lock with the maximum transaction
ID. This means that all transactional write ops will wait for non-transactional writes to finish,
and non-transactional writes will abort in the presence of a multi-row transaction inserting rows
into the same tablet.
== Transactions client API
Both the C++ and Java client leverage the existing session-based API that users have come to know.
With transactions, however, there is also the concept of transaction handles, which serve as the
initiators of transactions, the vehicle with which to create transactional sessions, the object
with which to orchestrate the commit or rollback of a transaction, and a means to signal activity on
a transaction via automatic heartbeating to prevent the `TxnStatusManager` from culling stale
transactions.
[source,c++]
----
shared_ptr<KuduTransaction> txn;
KUDU_RETURN_NOT_OK(client->NewTransaction(&txn));
shared_ptr<KuduSession> session;
KUDU_RETURN_NOT_OK(txn->CreateSession(&session));
// ... insert rows to 'session'
KUDU_RETURN_NOT_OK(session->Flush());
KUDU_RETURN_NOT_OK(txn->Commit());
----
[source,java]
----
KuduTransaction txn = client.newTransaction();
KuduSession session = txn.newKuduSession();
// ... insert rows to 'session'
session.flush();
txn.commit();
----
=== Heartbeating
Under the hood, each newly-created transaction handle, while kept in scope, also heartbeats through
to the `TxnStatusManagers` to signal activity for the transaction, preventing it from being culled
by the transaction staleness checkers. Client applications do not need to explicitly heartbeat.
=== Serialization
Given we expect there to be distributed actors that participate in a given transaction, Kudu also
exposes a way to transmit transaction handles across processes. Rather than exposing internal
details like the transaction ID, we serialize and deserialize a `TxnTokenPB` that contains metadata
about the transaction.
[source,c++]
----
string txn_token;
shared_ptr<KuduTransaction> txn;
KUDU_RETURN_NOT_OK(client_->NewTransaction(&txn));
KUDU_RETURN_NOT_OK(txn->Serialize(&txn_token));
shared_ptr<KuduTransaction> same_txn;
KuduTransaction::Deserialize(client, txn_token, &same_txn)
----
[source,java]
----
KuduTransaction txn = client.newTransaction();
byte[] txn_token = txn.serialize();
KuduTransaction sameTxn = KuduTransaction.deserialize(txn_token, asyncClient)
----
Since we typically expect there to be a single driver of a transaction and multiple actors as
participants of the transaction, by default, deserialized transaction handles do not heartbeat.
The expectation is that the drivers will continue heartbeating until the transaction is complete.
This can be toggled by passing customized `SerializationOptions`.