blob: 2eb8aca9a82cfbb63f1ebe8b5d34e74a3e810b9f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"sync"
"sync/atomic"
"time"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)
type subscription struct {
topic string
subscription string
}
type transaction struct {
sync.Mutex
txnID TxnID
state TxnState
tcClient *transactionCoordinatorClient
registerPartitions map[string]bool
registerAckSubscriptions map[subscription]bool
// opsFlow It has two effects:
// 1. Wait all the operations of sending and acking messages with the transaction complete
// by reading msg from the chan.
// 2. Prevent sending or acking messages with a committed or aborted transaction.
// opsCount is record the number of the uncompleted operations.
// opsFlow
// Write:
// 1. When the transaction is created, a bool will be written to opsFlow chan.
// 2. When the opsCount decrement from 1 to 0, a new bool will be written to opsFlow chan.
// 3. When get a retryable error after committing or aborting the transaction,
// a bool will be written to opsFlow chan.
// Read:
// 1. When the transaction is committed or aborted, a bool will be read from opsFlow chan.
// 2. When the opsCount increment from 0 to 1, a bool will be read from opsFlow chan.
opsFlow chan bool
opsCount int32
opTimeout time.Duration
log log.Logger
}
func newTransaction(id TxnID, tcClient *transactionCoordinatorClient, timeout time.Duration) *transaction {
transaction := &transaction{
txnID: id,
state: TxnOpen,
registerPartitions: make(map[string]bool),
registerAckSubscriptions: make(map[subscription]bool),
opsFlow: make(chan bool, 1),
opTimeout: 5 * time.Second,
tcClient: tcClient,
}
//This means there are not pending requests with this transaction. The transaction can be committed or aborted.
transaction.opsFlow <- true
go func() {
//Set the state of the transaction to timeout after timeout
<-time.After(timeout)
atomic.CompareAndSwapInt32((*int32)(&transaction.state), int32(TxnOpen), int32(TxnTimeout))
}()
transaction.log = tcClient.log.SubLogger(log.Fields{})
return transaction
}
func (txn *transaction) GetState() TxnState {
return txn.state
}
func (txn *transaction) Commit(ctx context.Context) error {
if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), int32(TxnCommitting)) ||
txn.state == TxnCommitting) {
return newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string())
}
//Wait for all operations to complete
select {
case <-txn.opsFlow:
case <-time.After(txn.opTimeout):
return newError(TimeoutError, "There are some operations that are not completed after the timeout.")
}
//Send commit transaction command to transaction coordinator
err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_COMMIT)
if err == nil {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnCommitted))
} else {
if e, ok := err.(*Error); ok && (e.Result() == TransactionNoFoundError || e.Result() == InvalidStatus) {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
return err
}
txn.opsFlow <- true
}
return err
}
func (txn *transaction) Abort(ctx context.Context) error {
if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), int32(TxnAborting)) ||
txn.state == TxnAborting) {
return newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string())
}
//Wait for all operations to complete
select {
case <-txn.opsFlow:
case <-time.After(txn.opTimeout):
return newError(TimeoutError, "There are some operations that are not completed after the timeout.")
}
//Send abort transaction command to transaction coordinator
err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_ABORT)
if err == nil {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnAborted))
} else {
if e, ok := err.(*Error); ok && (e.Result() == TransactionNoFoundError || e.Result() == InvalidStatus) {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
} else {
txn.opsFlow <- true
}
}
return err
}
func (txn *transaction) registerSendOrAckOp() error {
if atomic.AddInt32(&txn.opsCount, 1) == 1 {
//There are new operations that not completed
select {
case <-txn.opsFlow:
return nil
case <-time.After(txn.opTimeout):
if _, err := txn.checkIfOpen(); err != nil {
return err
}
return newError(TimeoutError, "Failed to get the semaphore to register the send/ack operation")
}
}
return nil
}
func (txn *transaction) endSendOrAckOp(err error) {
if err != nil {
atomic.StoreInt32((*int32)(&txn.state), int32(TxnError))
}
if atomic.AddInt32(&txn.opsCount, -1) == 0 {
//This means there are not pending send/ack requests
txn.opsFlow <- true
}
}
func (txn *transaction) registerProducerTopic(topic string) error {
isOpen, err := txn.checkIfOpen()
if !isOpen {
return err
}
_, ok := txn.registerPartitions[topic]
if !ok {
txn.Lock()
defer txn.Unlock()
if _, ok = txn.registerPartitions[topic]; !ok {
err := txn.tcClient.addPublishPartitionToTxn(&txn.txnID, []string{topic})
if err != nil {
return err
}
txn.registerPartitions[topic] = true
}
}
return nil
}
func (txn *transaction) registerAckTopic(topic string, subName string) error {
isOpen, err := txn.checkIfOpen()
if !isOpen {
return err
}
sub := subscription{
topic: topic,
subscription: subName,
}
_, ok := txn.registerAckSubscriptions[sub]
if !ok {
txn.Lock()
defer txn.Unlock()
if _, ok = txn.registerAckSubscriptions[sub]; !ok {
err := txn.tcClient.addSubscriptionToTxn(&txn.txnID, topic, subName)
if err != nil {
return err
}
txn.registerAckSubscriptions[sub] = true
}
}
return nil
}
func (txn *transaction) GetTxnID() TxnID {
return txn.txnID
}
func (txn *transaction) checkIfOpen() (bool, error) {
if txn.state == TxnOpen {
return true, nil
}
return false, newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string())
}
func (state TxnState) string() string {
switch state {
case TxnOpen:
return "TxnOpen"
case TxnCommitting:
return "TxnCommitting"
case TxnAborting:
return "TxnAborting"
case TxnCommitted:
return "TxnCommitted"
case TxnAborted:
return "TxnAborted"
case TxnTimeout:
return "TxnTimeout"
case TxnError:
return "TxnError"
default:
return "Unknown"
}
}