blob: 66a82ccad1f02a4634cc779ee39e864673544959 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
pb ""
func TestTCClient(t *testing.T) {
//1. Prepare: create PulsarClient and init transaction coordinator client.
topic := newTopicName()
sub := "my-sub"
tc, client := createTcClient(t)
//2. Prepare: create Topic and Subscription.
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
assert.NoError(t, err)
//3. Test newTransaction, addSubscriptionToTxn, addPublishPartitionToTxn
//Create a transaction1 and add subscription and publish topic to the transaction.
id1, err := tc.newTransaction(3 * time.Minute)
assert.NoError(t, err)
err = tc.addSubscriptionToTxn(id1, topic, sub)
assert.NoError(t, err)
err = tc.addPublishPartitionToTxn(id1, []string{topic})
assert.NoError(t, err)
//4. Verify the transaction1 stats
stats, err := transactionStats(id1)
assert.NoError(t, err)
assert.Equal(t, "OPEN", stats["status"])
producedPartitions := stats["producedPartitions"].(map[string]interface{})
ackedPartitions := stats["ackedPartitions"].(map[string]interface{})
_, ok := producedPartitions[topic]
assert.True(t, ok)
temp, ok := ackedPartitions[topic]
assert.True(t, ok)
subscriptions := temp.(map[string]interface{})
_, ok = subscriptions[sub]
assert.True(t, ok)
//5. Test End transaction
//Create transaction2 and Commit the transaction.
id2, err := tc.newTransaction(3 * time.Minute)
assert.NoError(t, err)
//6. Verify the transaction2 stats
stats2, err := transactionStats(id2)
assert.NoError(t, err)
assert.Equal(t, "OPEN", stats2["status"])
err = tc.endTxn(id2, pb.TxnAction_COMMIT)
assert.NoError(t, err)
stats2, err = transactionStats(id2)
//The transaction will be removed from txnMeta. Therefore, it is expected that stats2 is zero
if err == nil {
assert.Equal(t, "COMMITTED", stats2["status"])
} else {
assert.Equal(t, err.Error(), "http error status code: 404")
defer consumer.Close()
defer tc.close()
defer client.Close()
//Test points:
// 1. Abort and commit txn.
// 1. Do nothing, just open a transaction and then commit it or abort it.
// The operations of committing/aborting txn should success at the first time and fail at the second time.
// 2. The internal API, registerSendOrAckOp and endSendOrAckOp
// 1. Register 4 operation but only end 3 operations, the transaction can not be committed or aborted.
// 2. Register 3 operation and end 3 operation the transaction can be committed and aborted.
// 3. Register an operation and end the operation with an error,
// and then the state of the transaction should be replaced to errored.
// 3. The internal API, registerAckTopic and registerProducerTopic
// 1. Register ack topic and send topic, and call http request to get the stats of the transaction
// to do verification.
// TestTxnImplCommitOrAbort Test abort and commit txn
func TestTxnImplCommitOrAbort(t *testing.T) {
tc, _ := createTcClient(t)
//1. Open a transaction and then commit it.
//The operations of committing txn1 should success at the first time and fail at the second time.
txn1 := createTxn(tc, t)
err := txn1.Commit(context.Background())
require.Nil(t, err, fmt.Sprintf("Failed to commit the transaction %d:%d\n", txn1.txnID.MostSigBits,
txn1.state = TxnOpen
txn1.opsFlow <- true
err = txn1.Commit(context.Background())
assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
assert.Equal(t, txn1.GetState(), TxnError)
//2. Open a transaction and then abort it.
//The operations of aborting txn2 should success at the first time and fail at the second time.
id2, err := tc.newTransaction(time.Hour)
require.Nil(t, err, "Failed to new a transaction")
txn2 := newTransaction(*id2, tc, time.Hour)
err = txn2.Abort(context.Background())
require.Nil(t, err, fmt.Sprintf("Failed to abort the transaction %d:%d\n",
id2.MostSigBits, id2.LeastSigBits))
txn2.state = TxnOpen
txn2.opsFlow <- true
err = txn2.Abort(context.Background())
assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError)
assert.Equal(t, txn1.GetState(), TxnError)
err = txn2.registerSendOrAckOp()
assert.Equal(t, err.(*Error).Result(), InvalidStatus)
err = txn1.registerSendOrAckOp()
assert.Equal(t, err.(*Error).Result(), InvalidStatus)
// TestRegisterOpAndEndOp Test the internal API including the registerSendOrAckOp and endSendOrAckOp.
func TestRegisterOpAndEndOp(t *testing.T) {
tc, _ := createTcClient(t)
//1. Register 4 operation but only end 3 operations, the transaction can not be committed or aborted.
res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
assert.Equal(t, res.(*Error).Result(), TimeoutError)
res = registerOpAndEndOp(t, tc, 4, 3, nil, false)
assert.Equal(t, res.(*Error).Result(), TimeoutError)
//2. Register 3 operation and end 3 operation the transaction can be committed and aborted.
res = registerOpAndEndOp(t, tc, 3, 3, nil, true)
assert.Nil(t, res)
res = registerOpAndEndOp(t, tc, 3, 3, nil, false)
assert.Nil(t, res)
//3. Register an operation and end the operation with an error,
// and then the state of the transaction should be replaced to errored.
res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), true)
assert.Equal(t, res.(*Error).Result(), InvalidStatus)
res = registerOpAndEndOp(t, tc, 4, 4, errors.New(""), false)
assert.Equal(t, res.(*Error).Result(), InvalidStatus)
// TestRegisterTopic Test the internal API, registerAckTopic and registerProducerTopic
func TestRegisterTopic(t *testing.T) {
//1. Prepare: create PulsarClient and init transaction coordinator client.
topic := newTopicName()
sub := "my-sub"
tc, client := createTcClient(t)
//2. Prepare: create Topic and Subscription.
_, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
assert.NoError(t, err)
txn := createTxn(tc, t)
//3. Create a topic and register topic and subscription.
err = txn.registerAckTopic(topic, sub)
require.Nil(t, err, "Failed to register ack topic.")
err = txn.registerProducerTopic(topic)
require.Nil(t, err, "Failed to register ack topic.")
//4. Call http request to get the stats of the transaction to do verification.
stats2, err := transactionStats(&txn.txnID)
assert.NoError(t, err)
topics := stats2["producedPartitions"].(map[string]interface{})
subTopics := stats2["ackedPartitions"].(map[string]interface{})
assert.NotNil(t, topics[topic])
assert.NotNil(t, subTopics[topic])
subs := subTopics[topic].(map[string]interface{})
assert.NotNil(t, subs[sub])
func registerOpAndEndOp(t *testing.T, tc *transactionCoordinatorClient, rp int, ep int, err error, commit bool) error {
txn := createTxn(tc, t)
for i := 0; i < rp; i++ {
err := txn.registerSendOrAckOp()
assert.Nil(t, err)
for i := 0; i < ep; i++ {
if commit {
err = txn.Commit(context.Background())
} else {
err = txn.Abort(context.Background())
return err
func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction {
id, err := tc.newTransaction(time.Hour)
require.Nil(t, err, "Failed to new a transaction.")
return newTransaction(*id, tc, time.Hour)
// createTcClient Create a transaction coordinator client to send request
func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) {
c, err := NewClient(ClientOptions{
URL: webServiceURLTLS,
TLSTrustCertsFilePath: caCertsPath,
Authentication: NewAuthenticationTLS(tlsClientCertPath, tlsClientKeyPath),
EnableTransaction: true,
require.Nil(t, err, "Failed to create client.")
return c.(*client).tcClient, c.(*client)
// TestConsumeAndProduceWithTxn is a test function that validates the behavior of producing and consuming
// messages with and without transactions. It consists of the following steps:
// 1. Prepare: Create a PulsarClient and initialize the transaction coordinator client.
// 2. Prepare: Create a topic and a subscription.
// 3. Produce 10 messages with a transaction and 10 messages without a transaction.
// - Expectation: The consumer should be able to receive the 10 messages sent without a transaction,
// but not the 10 messages sent with the transaction.
// 4. Commit the transaction and receive the remaining 10 messages.
// - Expectation: The consumer should be able to receive the 10 messages sent with the transaction.
// 5. Clean up: Close the consumer and producer instances.
// The test ensures that the consumer can only receive messages sent with a transaction after it is committed,
// and that it can always receive messages sent without a transaction.
func TestConsumeAndProduceWithTxn(t *testing.T) {
// Step 1: Prepare - Create PulsarClient and initialize the transaction coordinator client.
topic := newTopicName()
sub := "my-sub"
_, client := createTcClient(t)
// Step 2: Prepare - Create Topic and Subscription.
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
assert.NoError(t, err)
producer, _ := client.CreateProducer(ProducerOptions{
Topic: topic,
SendTimeout: 0,
// Step 3: Open a transaction, send 10 messages with the transaction and 10 messages without the transaction.
// Expectation: We can receive the 10 messages sent without a transaction and
// cannot receive the 10 messages sent with the transaction.
txn, err := client.NewTransaction(time.Hour)
require.Nil(t, err)
for i := 0; i < 10; i++ {
_, err = producer.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
require.Nil(t, err)
for i := 0; i < 10; i++ {
_, err := producer.Send(context.Background(), &ProducerMessage{
Transaction: txn,
Payload: make([]byte, 1024),
require.Nil(t, err)
// Attempt to receive and acknowledge the 10 messages sent without a transaction.
for i := 0; i < 10; i++ {
msg, _ := consumer.Receive(context.Background())
assert.NotNil(t, msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
// Create a goroutine to attempt receiving a message and send it to the 'done1' channel.
done1 := make(chan Message)
go func() {
msg, _ := consumer.Receive(context.Background())
err := consumer.AckID(msg.ID())
require.Nil(t, err)
// Expectation: The consumer should not receive uncommitted messages.
select {
case <-done1:
require.Fail(t, "The consumer should not receive uncommitted message")
case <-time.After(time.Second):
// Step 4: After committing the transaction, we should be able to receive the remaining 10 messages.
// Acknowledge the rest of the 10 messages with the transaction.
// Expectation: After committing the transaction, all messages of the subscription will be acknowledged.
_ = txn.Commit(context.Background())
txn, err = client.NewTransaction(time.Hour)
require.Nil(t, err)
for i := 0; i < 9; i++ {
msg, _ := consumer.Receive(context.Background())
require.NotNil(t, msg)
err = consumer.AckWithTxn(msg, txn)
require.Nil(t, err)
_ = txn.Commit(context.Background())
consumer, _ = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
// Create a goroutine to attempt receiving a message and send it to the 'done1' channel.
done2 := make(chan Message)
go func() {
// Expectation: The consumer should not receive uncommitted messages.
select {
case <-done2:
require.Fail(t, "The consumer should not receive messages")
case <-time.After(time.Second):
// Step 5: Clean up - Close the consumer and producer instances.
func TestAckAndSendWithTxn(t *testing.T) {
// Prepare: Create PulsarClient and initialize the transaction coordinator client.
sourceTopic := newTopicName()
sinkTopic := newTopicName()
sub := "my-sub"
_, client := createTcClient(t)
// Prepare: Create source and sink topics and subscriptions.
sourceConsumer, _ := client.Subscribe(ConsumerOptions{
Topic: sourceTopic,
SubscriptionName: sub,
sourceProducer, _ := client.CreateProducer(ProducerOptions{
Topic: sourceTopic,
SendTimeout: 0,
sinkConsumer, _ := client.Subscribe(ConsumerOptions{
Topic: sinkTopic,
SubscriptionName: sub,
sinkProducer, _ := client.CreateProducer(ProducerOptions{
Topic: sinkTopic,
SendTimeout: 0,
// Produce 10 messages to the source topic.
for i := 0; i < 10; i++ {
_, err := sourceProducer.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
require.Nil(t, err)
// Open a transaction and consume messages from the source topic while sending messages to the sink topic.
txn, err := client.NewTransaction(time.Hour)
require.Nil(t, err)
for i := 0; i < 10; i++ {
msg, _ := sourceConsumer.Receive(context.Background())
require.NotNil(t, msg)
payload := msg.Payload()
_, err := sinkProducer.Send(context.Background(), &ProducerMessage{
Transaction: txn,
Payload: payload,
require.Nil(t, err)
err = sourceConsumer.AckWithTxn(msg, txn)
require.Nil(t, err)
// Commit the transaction.
_ = txn.Commit(context.Background())
// Verify that the messages are available in the sink topic.
for i := 0; i < 10; i++ {
msg, _ := sinkConsumer.Receive(context.Background())
require.NotNil(t, msg)
err = sinkConsumer.Ack(msg)
require.Nil(t, err)
// Clean up: Close consumers and producers.
func TestTransactionAbort(t *testing.T) {
// Prepare: Create PulsarClient and initialize the transaction coordinator client.
topic := newTopicName()
sub := "my-sub"
_, client := createTcClient(t)
// Prepare: Create Topic and Subscription.
consumer, _ := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
producer, _ := client.CreateProducer(ProducerOptions{
Topic: topic,
SendTimeout: 0,
// Open a transaction and send 10 messages with the transaction.
txn, err := client.NewTransaction(time.Hour)
require.Nil(t, err)
for i := 0; i < 10; i++ {
_, err = producer.Send(context.Background(), &ProducerMessage{
Transaction: txn,
Payload: make([]byte, 1024),
require.Nil(t, err)
// Abort the transaction.
_ = txn.Abort(context.Background())
// Expectation: The consumer should not receive any messages.
done := make(chan struct{})
go func() {
_, err := consumer.Receive(context.Background())
if err != nil {
select {
case <-done:
// The consumer should not receive any messages.
require.Fail(t, "The consumer should not receive any messages")
case <-time.After(time.Second):
// Clean up: Close the consumer and producer instances.