JIRA: HUDI-6709
Modern data lake architectures often comprise numerous interconnected tables. Operations, such as data backfill, upserts, deletes or complex transformations, may span across multiple tables. In these scenarios, it's crucial that these operations are atomic - i.e., they either succeed across all tables or fail without partial writes. This ensures data consistency across the entire dataset. Users can design data workflows with the assurance that operations spanning multiple tables are treated as a single atomic unit.
Hudi has always emphasized transactional guarantees, ensuring data integrity and consistency for a specific table. Central to Hudi's approach to transactions is its timeline, which logs all actions (like commits, deltacommits, rollbacks, etc) on the table. With timeline as the source of truth for all changes on the table, Hudi employs tunable concurrency control to allow for concurrent reads and writes on the table with snapshot isolation. This is achieved by leveraging the timeline to determine the latest consistent snapshot of the table. Additionally, users can bring their own custom conflict resolution strategy by implementing the ConflictResolutionStrategy
interface.
However, the current implementation cannot be extended as-is to support atomic operations across multiple tables. First of all, we need a notion of a “database” and its tables to be able to associate a transaction with multiple tables. Secondly, Hudi's timeline would need to evolve to account for changes across multiple tables. This introduces complexities in tracking and managing the order of operations across tables. With multiple tables involved, the points of potential failures increase. A failure in one table can cascade and affect the entire transaction. In case of failures, rolling back changes becomes more intricate in multi-table scenarios. Ensuring data consistency across tables during rollbacks, especially when there are inter-table dependencies, introduces additional challenges. Finally, the current concurrency control implementation is not designed to handle multi-table transactions. Hence, this RFC proposes a new design to support multi-table transactions.
First of all, let us discuss the goals and non-goals which will help in understanding the design considerations better.
TransacationManager
, LockManager
, ConflictResolutionStrategy
APIs should work as they do today with single table.Our primary challenge is to support operations that span multiple tables within a single database, and maintain the ACID properties across these tables.
Today we have an event log at the table level in the form of Hudi timeline. To support multi-table transactions, we are going to need a unified view of all tables within the database to ensure atomicity and consistency. Hence, we propose a database-level timeline as transaction log, which mainly contains the following:
Hudi naturally supports MVCC based snapshot isolation. We can leverage the Hudi table and database timeline to support snapshot isolation with multi-table transactions with concurrent readers and writers.
Anomalies or Conflicts
With MVCC and snapshot isolation:
Definition: A transaction reads data that's been written by a still in-progress transaction.
Analysis with MVCC: Dirty reads are inherently prevented in MVCC with snapshot isolation. Since every transaction is reading from its snapshot, it won't see uncommitted changes made by other transactions.
Definition: In the course of a transaction, new records get added or old records get removed, which fit the criteria of a previous read in the transaction. See Appendix for an example.
Analysis with MVCC: Since transactions operate on snapshots, they won‘t see new records added after the snapshot was taken. However, if a transaction’s intent is to ensure that new records of a certain kind don‘t get added, additional mechanisms, like predicate locks, might be needed. If we don’t allow snapshot to be refreshed within the same transaction, then phantom reads are not possible (including self-join).
Definition: Given a scenario of a concurrent read happening while the transactions are ongoing: A transaction reads multiple items, but by the time it finishes reading, another transaction has modified some of the earlier-read items, resulting in an inconsistent view.
Analysis with MVCC: The data that a transaction reads remains consistent for the duration of the transaction. With MVCC and snapshot isolation, the read operation will see a consistent snapshot of the database, depending on when the read started. It won‘t see the uncommitted changes made by the ongoing transactions. So, the read is consistent, and there’s no anomaly related to the read operation in this setup.
Definition: Two transactions read the same data, and based on the read, they both decide to modify the data, leading to a situation where the final outcome might not be consistent with the intent of either transaction.
Analysis with MVCC: This is a potential problem even with snapshot isolation. Let‘s say two transactions, T1 and T2, start at the same time and read the same file in one of the tables. Both decide to modify the file based on what they’ve read. Since they are operating on snapshots, neither transaction sees the other's changes. When they try to commit, they might both try to create a new version of the file, leading to a write-write conflict. The system, noticing the conflict, would typically abort one of the transactions or apply some conflict resolution strategy.
Conflict detection will happen on the basis of set of partitions and fileIDs mutated by the transaction. Let's say there are two conflicting transaction T1 and T2. Both of them, when they start, fetch latest versions of all tables and register themselves with start timestamp in the database timeline, which will be greater than all versions of tables involved in T1 and T2. To handle conflict before committing, we have the following options:
First committer/Younger transaction wins - Essentially, no resolution required with end timestamp based ordering. If a transaction tries to modify data that has been modified by a younger transaction (with a later timestamp), the older transaction is rolled back to avoid the conflict. This ensures that transactions are serialized based on their timestamps.
OCC (with lock provider) - With start and end timestamp, we have the option of “wait-die” and " wound-wait" (CMU notes on 2PL section 3) strategies. In the Wait-Die strategy, if an older transaction requests a lock held by a younger one, the older transaction is forced to wait or is aborted (“die”) after some time. In the Wound-Wait scheme, the younger transaction is aborted (“wounded”) to allow the older transaction to proceed.
Compensating transaction - Instead of aborting a transaction when a conflict is detected, another transaction is executed based on the latest snapshot to compensate for the effects of the conflicting transaction.
Let's break down the multi-table transaction protocol using the scenarios presented below (assuming we have a logical notion of a database and each database will have its own timeline, called the database timeline, that will track transactions across tables within that database).
Scenario 1: Basic Transaction with Updates to Three Tables
Begin Transaction
W1
initiates a transaction.database timeline
logs the start of a new transaction with a unique transaction ID (TxID1
).Updates to Tables
W1
writes updates to t1
. The updates are staged and tracked under TxID1
in t1
's timeline in a ‘pending’ state.W1
writes updates to t2
and t3
. The updates are staged and tracked in respective table's timelines under TxID1
.Commit Transaction
W1
signals the end of its transaction.database timeline
marks the transaction TxID1
as committed.t1
, t2
, and t3
finalize the ‘pending’ updates under TxID1
.Scenario 2: Transaction with a Conflict in One Table
Begin Transactions
W1
initiates a transaction (TxID1
).W2
starts a new transaction (TxID2
).Conflicting Updates
W1
and W2
both try to write to a shared file in t1
.W2
tries to commit its changes, as W1
has already staged its updates.W2
's changes to t1
are aborted, and it receives a conflict notification.Scenario 3: Transaction with Concurrent Reads During Pending Updates
Begin Transaction
W1
initiates a transaction (TxID1
) and stages updates for t1
and t2
. t3
's update is still in progress.Concurrent Read by Reader R1
R1
wishes to read from t1
, t2
, and t3
.W1
‘s updates to t1
and t2
are only staged and the transaction is not yet committed, R1
sees the data’s previous state for these tables.t3
, R1
reads the current state, as no updates have been staged or committed yet by W1
.Scenario 4: Transaction with Failed Update Requiring Rollback
Begin Transaction
W1
initiates a transaction (TxID1
) and successfully stages updates for t1
and t2
.Failed Update
t3
fails due to an internal error, such as an I/O error.Rollback Transaction
W1
initiates a rollback for TxID1
.database timeline
marks the transaction TxID1
as rolled back.t1
and t2
under TxID1
are discarded.Note: This protocol ensures snapshot isolation by ensuring that readers always see a consistent state of the data, either before or after a transaction's updates but never a mix of the two.
BEGIN tx1 // anything done here is associated with tx1. // load table A; (load A's latest snapshot time into HoodieCatalog/driver memory) // load table B; (load B's latest snapshot time into HoodieCatalog/driver memory) // load table A again; (it will reuse the snapshot time already in HoodieCatalog/driver memory). COMMIT / ROLLBACK
HoodieCatalog
in SparkTransactionalHoodieCatalog
that implements the Spark TableCatalog
interface while adding multi-table transaction support.beginTransaction
, commitTransaction
, etc., methods. We must make the createTable
and loadTable
method transaction-aware. Similarly, other methods would also need transaction-awareness where necessary.TableCatalog
interface of Spark.abstract class TransactionalHoodieCatalog extends TableCatalog { val spark: SparkSession = SparkSession.active // Representing a Transaction case class Transaction(transactionId: String, tablesInvolved: mutable.Set[String]) // State private var currentTransaction: Option[Transaction] = None // Databases and Tables - in-memory representation for this sketch private val databases: mutable.Map[String, CatalogDatabase] = mutable.Map() private val tables: mutable.Map[String, CatalogTable] = mutable.Map() // Transactional methods def beginTransaction(): String = { val newTransactionId = HoodieActiveTimeline.createNewInstantTime() currentTransaction = Some(Transaction(newTransactionId, mutable.Set())) newTransactionId } def addTableToTransaction(tableName: String): Unit = { currentTransaction match { case Some(tx) => tx.tablesInvolved += tableName case None => throw new IllegalStateException("No active transaction to add table to.") } } def commitTransaction(): Unit = { // Commit logic currentTransaction = None } def rollbackTransaction(): Unit = { // Rollback logic currentTransaction = None } // Implementation of Catalog methods // Database related operations override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = ??? override def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = ??? override def alterDatabase(dbDefinition: CatalogDatabase): Unit = ??? override def getDatabase(db: String): CatalogDatabase = { databases.getOrElse(db, throw new NoSuchDatabaseException(db)) } override def databaseExists(db: String): Boolean = { databases.contains(db) } override def listDatabases(): Seq[String] = { databases.keys.toSeq } override def listDatabases(pattern: String): Seq[String] = { databases.keys.filter(_.matches(pattern)).toSeq } // ... More database methods // Table related operations override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { // If there's an ongoing transaction, associate the table with it currentTransaction.foreach(tx => tx.tablesInvolved += tableDefinition.identifier.table) // Rest of the creation logic tables += (tableDefinition.identifier.table -> tableDefinition) } override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit = ??? override def loadTable(db: String, table: String, loadPath: String, isOverwrite: Boolean, isSrcLocal: Boolean): Unit = { tables.getOrElse(table, throw new NoSuchTableException(db, table)) } }
Spark's TableCatalog
API is a DataSource V2 API. So, this limits us to support multi-table txn just for datasource v2 while writing. For reading, we will fall back to v1, just like how we do in HoodieCatalog
today.
BeginMultiTableTransactionCommand
which will be invoked when the parser determines the query to be a BEGIN...END block. This command will initiate the transaction using beginTransaction
of the catalog.HoodieSparkSqlWriter
), and catalog.addTableToTransaction
will be called before making any changes.EndMultiTableTransactionCommand
will call catalog.commitTransaction()
to persist all changes or rollbackTransaction()
to discard them.Hudi relations don't currently implement DSv2 read API and only support DSv2 write API. According to the LHBench paper,
the DSv2 API is less mature and does not report some metrics useful in query planning, so this often results in less performant query plans over Iceberg. For example, in Q9, Spark optimizes a complex aggregation with a cross-join in Delta Lake and Hudi but not in Iceberg, leading to the largest relative performance difference in all of TPC-DS.
Also, for writing, Hudi falls back to DSv1 ( except for schema on read).
Why fallback? While DataFrame shuffles are largely determined by Spark‘s logical and physical planning stages, DSv2 has interfaces ( like SupportsRead
, SupportsWrite
, and their sub-interfaces) that allow for better pushdown and optimization. Properly leveraging these can reduce the need for shuffles, but the API itself doesn’t directly control shuffling. If we want to implement custom shuffling logic as part of a custom data source, DSv2 doesn‘t directly provide specific interfaces for that. DSv2 allows data sources to report their physical distribution (like bucketing or partitioning) via the SupportsReportPartitioning
trait. When Spark is aware of the data’s distribution and partitioning, it might avoid unnecessary shuffles. However, if the data source does not correctly implement or report these characteristics, it could lead to suboptimal execution plans. So, the onus is partially on the data source implementation to provide accurate information.
We can follow the same approach i.e. in order to support multi-table transactions with DSv2, we can create v2Table in the transactional catalog but fall back to V1Write
.
// similar to catalog.beginTransaction val transaction = HoodieTransactionManager.beginTransaction() // similar to catalog.addTableToTransaction and execute the write. // Then pass the control to HoodieTransactionManager. df1.write.format("hudi").withTransaction(transaction).options(opts1).save(basePath1) // Same as above. If this failed, then rollback and pass the control to HoodieTransactionManager. // HoodieTransactionManager will rollback all previous transactions associated with transaction id. df2.write.format("hudi").withTransaction(transaction).options(opts2).save(basePath2) // All success, go ahead and commit or finish rollback (ROLLED_BACK state on timeline is written in that case) transaction.commit()
The design is premised on certain format changes to the timeline. Users can use the feature on new tables created with the upgraded version. Older datasets that were written without the notion of a database or a database timeline should still be readable. However, users who want to leverage the new multi-table transaction capabilities with the existing datasets would ideally need a smooth migration path. This could involve tools or scripts that help in restructuring data or migrating the older timeline events into the new database timeline.
No need to phase out the older behavior. The new behavior will be activated explicitly by the use of new APIs or SQL syntax.
TODO
Suppose you have a table named Employees
with columns employee_id
and manager_id
, where the manager_id
for some employees refers to the employee_id
of other employees in the same table. One might execute a self-join to retrieve a list of employees and their managers:
SELECT e1.employee_id, e2.employee_id AS manager_id FROM Employees e1 JOIN Employees e2 ON e1.manager_id = e2.employee_id;
This query essentially matches each employee with their manager, using a self-join on the Employees
table.
1. **Transaction A** starts and runs the self-join query to get a list of employees and their respective managers. 2. While **Transaction A** is still in progress, **Transaction B** starts and adds a new row to the `Employees` table, inserting a new employee with a manager whose `employee_id` is already in the table. 3. **Transaction B** commits its changes. 4. If **Transaction A** re-runs the same self-join query, it might see the newly added row, resulting in an additional result in the join output that wasn't there during the initial query. This is a phantom read.
With MVCC and snapshot isolation level, a transaction would continue to see the state of the database as it was when the transaction started, even if it re-runs the self-join. This level will prevent the phantom read in this case. However, it cannot be guaranteed with read committed.