id: txn-use title: How to use transactions? sidebar_label: How to use transactions?

Transaction API

The transaction feature is primarily a server-side and protocol-level feature. You can use the transaction feature via the transaction API, which is available in Pulsar 2.8.0 or later.

To use the transaction API, you do not need any additional settings in the Pulsar client. By default, transactions is disabled.

Currently, transaction API is only available for Java clients. Support for other language clients will be added in the future releases.

Quick start

This section provides an example of how to use the transaction API to send and receive messages in a Java client.

  1. Start Pulsar 2.8.0 or later.

  2. Enable transaction.

    Change the configuration in the broker.conf file.

    transactionCoordinatorEnabled=true
    

    If you want to enable batch messages in transactions, follow the steps below.

    Set acknowledgmentAtBatchIndexLevelEnabled to true in the broker.conf or standalone.conf file.

    acknowledgmentAtBatchIndexLevelEnabled=true
    
  3. Initialize transaction coordinator metadata.

    The transaction coordinator can leverage the advantages of partitioned topics (such as load balance).

    Input

    bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone
    

    Output

    Transaction coordinator metadata setup success
    
  4. Initialize a Pulsar client.

    PulsarClient client = PulsarClient.builder()
    
    .serviceUrl(“pulsar://localhost:6650”)
    
    .enableTransaction(true)
    
    .build();
    

Now you can start using the transaction API to send and receive messages. Below is an example of a consume-process-produce application written in Java.

Let’s walk through this example step by step.

[1] Example of enabling batch messages ack in transactions in the consumer builder.

Consumer<byte[]> sinkConsumer = pulsarClient
    .newConsumer()
    .topic(transferTopic)
    .subscriptionName("sink-topic")

.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
    .subscriptionType(SubscriptionType.Shared)
    .enableBatchIndexAcknowledgment(true) // enable batch index acknowledgement
    .subscribe();