| # PIP-298: Support read transaction buffer snapshot segments from earliest |
| |
| # Background |
| |
| In the implementation of the Pulsar Transaction, each topic is configured with a `Transaction Buffer` to prevent |
| consumers from reading uncommitted messages, which are invisible until the transaction is committed. Transaction Buffer |
| works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only dispatches messages, before the |
| maxReadPosition, to the consumers. When the broker dispatches the messages before maxReadPosition to the consumer, the |
| messages sent by aborted transactions will get filtered by the Transaction Buffer. |
| |
| # Motivation |
| |
| Currently, Pulsar transactions do not have configurable isolation levels. By introducing isolation level configuration |
| for consumers, we can enhance the flexibility of Pulsar transactions. |
| |
| Let's consider an example: |
| |
| **System**: Financial Transaction System |
| |
| **Operations**: Large volume of deposit and withdrawal operations, a |
| small number of transfer operations. |
| |
| **Roles**: |
| |
| - **Client A1** |
| - **Client A2** |
| - **User Account B1** |
| - **User Account B2** |
| - **Request Topic C** |
| - **Real-time Monitoring System D** |
| - **Business Processing System E** |
| |
| **Client Operations**: |
| |
| - **Withdrawal**: Client A1 decreases the deposit amount from User |
| Account B1 or B2. |
| - **Deposit**: Client A1 increases the deposit amount in User Account B1 or B2. |
| - **Transfer**: Client A2 decreases the deposit amount from User |
| Account B1 and increases it in User Account B2. Or vice versa. |
| |
| **Real-time Monitoring System D**: Obtains the latest data from |
| Request Topic C as quickly as possible to monitor transaction data and |
| changes in bank reserves in real-time. This is necessary for the |
| timely detection of anomalies and real-time decision-making. |
| |
| **Business Processing System E**: Reads data from Request Topic C, |
| then actually operates User Accounts B1, B2. |
| |
| **User Scenario**: Client A1 sends a large number of deposit and |
| withdrawal requests to Request Topic C. Client A2 writes a small |
| number of transfer requests to Request Topic C. |
| |
| In this case, Business Processing System E needs a read-committed |
| isolation level to ensure operation consistency and Exactly Once |
| semantics. The real-time monitoring system does not care if a small |
| number of transfer requests are incomplete (dirty data). What it |
| cannot tolerate is a situation where a large number of deposit and |
| withdrawal requests cannot be presented in real time due to a small |
| number of transfer requests (the current situation is that uncommitted |
| transaction messages can block the reading of committed transaction |
| messages). |
| |
| In this case, it is necessary to set different isolation levels for |
| different consumers/subscriptions. |
| The uncommitted transactions do not impact actual users' bank accounts. |
| Business Processing System E only reads committed transactional |
| messages and operates users' accounts. It needs Exactly-once semantic. |
| Real-time Monitoring System D reads uncommitted transactional |
| messages. It does not need Exactly-once semantic. |
| |
| They use different subscriptions and choose different isolation |
| levels. One needs transaction, one does not. |
| In general, multiple subscriptions of the same topic do not all |
| require transaction guarantees. |
| Some want low latency without the exact-once semantic guarantee, and |
| some must require the exactly-once guarantee. |
| We just provide a new option for different subscriptions. |
| |
| # Goal |
| |
| ## In Scope |
| |
| Implement Read Committed and Read Uncommitted isolation levels for Pulsar transactions. Allow consumers to configure |
| isolation levels during the building process. |
| |
| ## Out of Scope |
| |
| None. |
| |
| # High Level Design |
| |
| Add a configuration 'subscriptionIsolationLevel' in the consumer builder to allow users to choose different transaction |
| isolation levels. |
| |
| # Detailed Design |
| |
| ## Public-facing Changes |
| |
| Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read |
| Uncommitted. |
| |
| ### Before the Change |
| |
| The PulsarConsumer builder process currently does not include isolation level configurations. The consumer creation |
| process might look like this: |
| |
| ``` |
| PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); |
| |
| Consumer<String> consumer = client.newConsumer(Schema.STRING) |
| .topic("persistent://my-tenant/my-namespace/my-topic") |
| .subscriptionName("my-subscription") |
| .subscriptionType(SubscriptionType.Shared) |
| .subscribe(); |
| ``` |
| |
| ### After the Change |
| |
| Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read |
| Uncommitted. Introduce a new method subscriptionIsolationLevel() in the consumer builder, which accepts an enumeration |
| value representing the isolation level: |
| |
| ``` |
| public enum SubscriptionIsolationLevel { |
| // Consumer can only consume all transactional messages which have been committed. |
| READ_COMMITTED, |
| |
| // Consumer can consume all messages, even transactional messages which have been aborted. |
| READ_UNCOMMITTED; |
| } |
| ``` |
| |
| Then, modify the consumer creation process to include the new isolation level configuration: |
| |
| ``` |
| PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); |
| |
| Consumer<String> consumer = client.newConsumer(Schema.STRING) |
| .topic("persistent://my-tenant/my-namespace/my-topic") |
| .subscriptionName("my-subscription") |
| .subscriptionType(SubscriptionType.Shared) |
| .subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_COMMITTED) // Adding the isolation level configuration |
| .subscribe(); |
| ``` |
| |
| With this change, users can now choose between Read Committed and Read Uncommitted isolation levels when creating a new |
| consumer. If the isolationLevel() method is not called during the builder process, the default isolation level will be |
| Read Committed. |
| Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be |
| configured with the same IsolationLevel. |
| |
| ## Design & Implementation Details |
| |
| ### Client Changes |
| |
| Update the PulsarConsumer builder to accept isolation level configurations for Read Committed and Read Uncommitted levels. |
| |
| In order to achieve the above goals, the following modifications need to be made: |
| |
| - Added `IsolationLevel` related fields and methods in `ConsumerConfigurationData` and `ConsumerBuilderImpl` and `ConsumerImpl` |
| |
| - Modify PulsarApi.CommandSubscribe, add field -- IsolationLevel |
| |
| ``` |
| message CommandSubscribe { |
| |
| enum IsolationLevel { |
| READ_COMMITTED = 0; |
| READ_UNCOMMITTED = 1; |
| } |
| optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED]; |
| } |
| ``` |
| |
| ### Broker changes |
| |
| Modify the transaction buffer and dispatching mechanisms to handle messages based on the chosen isolation level. |
| |
| In order to achieve the above goals, the following modifications need to be made: |
| |
| - Determine in the `readMoreEntries` method of Dispatchers such as `PersistentDispatcherSingleActiveConsumer` |
| and `PersistentDispatcherMultipleConsumers`: |
| |
| - If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition = topic.getMaxReadPosition(), that is, |
| transactionBuffer.getMaxReadPosition() |
| |
| - If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition = PositionImpl.LATEST |
| |
| - Add a new metrics `subscriptionIsolationLevel` in `SubscriptionStatsImpl`. |
| |
| # Monitoring |
| |
| After this PIP, Users can query the subscription stats of a topic through the admin tool, and observe the `subscriptionIsolationLevel` in the subscription stats to determine the isolation level of the subscription. |
| |
| # Links |
| |
| * Mailing List discussion thread: https://lists.apache.org/thread/8ny0qtp7m9qcdbvnfjdvpnkc4c5ssyld |
| * Mailing List voting thread: https://lists.apache.org/thread/4q1hrv466h8w9ccpf4moxt6jv1jxp1mr |
| * Document link: https://github.com/apache/pulsar-site/pull/712 |