blob: 8a24c46429a26a9b9e870395e3a3d668c82ad09f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
pb ""
type subscription struct {
topic string
subscription string
type transaction struct {
txnID TxnID
state TxnState
tcClient *transactionCoordinatorClient
registerPartitions map[string]bool
registerAckSubscriptions map[subscription]bool
// opsFlow It has two effects:
// 1. Wait all the operations of sending and acking messages with the transaction complete
// by reading msg from the chan.
// 2. Prevent sending or acking messages with a committed or aborted transaction.
// opsCount is record the number of the uncompleted operations.
// opsFlow
// Write:
// 1. When the transaction is created, a bool will be written to opsFlow chan.
// 2. When the opsCount decrement from 1 to 0, a new bool will be written to opsFlow chan.
// 3. When get a retryable error after committing or aborting the transaction,
// a bool will be written to opsFlow chan.
// Read:
// 1. When the transaction is committed or aborted, a bool will be read from opsFlow chan.
// 2. When the opsCount increment from 0 to 1, a bool will be read from opsFlow chan.
opsFlow chan bool
opsCount int32
opTimeout time.Duration
log log.Logger
func newTransaction(id TxnID, tcClient *transactionCoordinatorClient, timeout time.Duration) *transaction {
transaction := &transaction{
txnID: id,
state: TxnOpen,
registerPartitions: make(map[string]bool),
registerAckSubscriptions: make(map[subscription]bool),
opsFlow: make(chan bool, 1),
opTimeout: 5 * time.Second,
tcClient: tcClient,
//This means there are not pending requests with this transaction. The transaction can be committed or aborted.
transaction.opsFlow <- true
go func() {
//Set the state of the transaction to timeout after timeout
atomic.CompareAndSwapInt32((*int32)(&transaction.state), int32(TxnOpen), int32(TxnTimeout))
transaction.log = tcClient.log.SubLogger(log.Fields{})
return transaction
func (txn *transaction) GetState() TxnState {
return txn.state
func (txn *transaction) Commit(ctx context.Context) error {
if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), int32(TxnCommitting)) ||
txn.state == TxnCommitting) {
return newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string())
//Wait for all operations to complete
select {
case <-txn.opsFlow:
case <-time.After(txn.opTimeout):
return newError(TimeoutError, "There are some operations that are not completed after the timeout.")
//Send commit transaction command to transaction coordinator
err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_COMMIT)
if err == nil {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnCommitted))
} else {
if err.(*Error).Result() == TransactionNoFoundError || err.(*Error).Result() == InvalidStatus {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
return err
txn.opsFlow <- true
return err
func (txn *transaction) Abort(ctx context.Context) error {
if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), int32(TxnAborting)) ||
txn.state == TxnAborting) {
return newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string())
//Wait for all operations to complete
select {
case <-txn.opsFlow:
case <-time.After(txn.opTimeout):
return newError(TimeoutError, "There are some operations that are not completed after the timeout.")
//Send abort transaction command to transaction coordinator
err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_ABORT)
if err == nil {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnAborted))
} else {
if err.(*Error).Result() == TransactionNoFoundError || err.(*Error).Result() == InvalidStatus {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
} else {
txn.opsFlow <- true
return err
func (txn *transaction) registerSendOrAckOp() error {
if atomic.AddInt32(&txn.opsCount, 1) == 1 {
//There are new operations that not completed
select {
case <-txn.opsFlow:
return nil
case <-time.After(txn.opTimeout):
if _, err := txn.checkIfOpen(); err != nil {
return err
return newError(TimeoutError, "Failed to get the semaphore to register the send/ack operation")
return nil
func (txn *transaction) endSendOrAckOp(err error) {
if err != nil {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
if atomic.AddInt32(&txn.opsCount, -1) == 0 {
//This means there are not pending send/ack requests
txn.opsFlow <- true
func (txn *transaction) registerProducerTopic(topic string) error {
isOpen, err := txn.checkIfOpen()
if !isOpen {
return err
_, ok := txn.registerPartitions[topic]
if !ok {
defer txn.Unlock()
if _, ok = txn.registerPartitions[topic]; !ok {
err := txn.tcClient.addPublishPartitionToTxn(&txn.txnID, []string{topic})
if err != nil {
return err
txn.registerPartitions[topic] = true
return nil
func (txn *transaction) registerAckTopic(topic string, subName string) error {
isOpen, err := txn.checkIfOpen()
if !isOpen {
return err
sub := subscription{
topic: topic,
subscription: subName,
_, ok := txn.registerAckSubscriptions[sub]
if !ok {
defer txn.Unlock()
if _, ok = txn.registerAckSubscriptions[sub]; !ok {
err := txn.tcClient.addSubscriptionToTxn(&txn.txnID, topic, subName)
if err != nil {
return err
txn.registerAckSubscriptions[sub] = true
return nil
func (txn *transaction) GetTxnID() TxnID {
return txn.txnID
func (txn *transaction) checkIfOpen() (bool, error) {
if txn.state == TxnOpen {
return true, nil
return false, newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string())
func (state TxnState) string() string {
switch state {
case TxnOpen:
return "TxnOpen"
case TxnCommitting:
return "TxnCommitting"
case TxnAborting:
return "TxnAborting"
case TxnCommitted:
return "TxnCommitted"
case TxnAborted:
return "TxnAborted"
case TxnTimeout:
return "TxnTimeout"
case TxnError:
return "TxnError"
return "Unknown"