Pulsar relies on two external systems for essential tasks:
ZooKeeper and BookKeeper are both open-source Apache projects.
Skip to the How Pulsar uses ZooKeeper and BookKeeper section below for a more schematic explanation of the role of these two systems in Pulsar.
Each Pulsar instance relies on two separate ZooKeeper quorums.
ZooKeeper manages a variety of essential coordination-related and configuration-related tasks for Pulsar.
To deploy a Pulsar instance, you need to stand up one local ZooKeeper cluster per Pulsar cluster.
To begin, add all ZooKeeper servers to the quorum configuration specified in the conf/zookeeper.conf
file. Add a server.N
line for each node in the cluster to the configuration, where N
is the number of the ZooKeeper node. The following is an example for a three-node cluster:
server.1=zk1.us-west.example.com:2888:3888 server.2=zk2.us-west.example.com:2888:3888 server.3=zk3.us-west.example.com:2888:3888
On each host, you need to specify the node ID in myid
file of each node, which is in data/zookeeper
folder of each server by default (you can change the file location via the dataDir
parameter).
See the Multi-server setup guide in the ZooKeeper documentation for detailed information on
myid
and more.
On a ZooKeeper server at zk1.us-west.example.com
, for example, you can set the myid
value like this:
$ mkdir -p data/zookeeper $ echo 1 > data/zookeeper/myid
On zk2.us-west.example.com
the command is echo 2 > data/zookeeper/myid
and so on.
Once you add each server to the zookeeper.conf
configuration and each server has the appropriate myid
entry, you can start ZooKeeper on all hosts (in the background, using nohup) with the pulsar-daemon
CLI tool:
$ bin/pulsar-daemon start zookeeper
The ZooKeeper cluster configured and started up in the section above is a local ZooKeeper cluster that you can use to manage a single Pulsar cluster. In addition to a local cluster, however, a full Pulsar instance also requires a configuration store for handling some instance-level configuration and coordination tasks.
If you deploy a single-cluster instance, you do not need a separate cluster for the configuration store. If, however, you deploy a multi-cluster instance, you need to stand up a separate ZooKeeper cluster for configuration tasks.
If your Pulsar instance consists of just one cluster, then you can deploy a configuration store on the same machines as the local ZooKeeper quorum but run on different TCP ports.
To deploy a ZooKeeper configuration store in a single-cluster instance, add the same ZooKeeper servers that the local quorum uses to the configuration file in conf/global_zookeeper.conf
using the same method for local ZooKeeper, but make sure to use a different port (2181 is the default for ZooKeeper). The following is an example that uses port 2184 for a three-node ZooKeeper cluster:
clientPort=2184 server.1=zk1.us-west.example.com:2185:2186 server.2=zk2.us-west.example.com:2185:2186 server.3=zk3.us-west.example.com:2185:2186
As before, create the myid
files for each server on data/global-zookeeper/myid
.
When you deploy a global Pulsar instance, with clusters distributed across different geographical regions, the configuration store serves as a highly available and strongly consistent metadata store that can tolerate failures and partitions spanning whole regions.
The key here is to make sure the ZK quorum members are spread across at least 3 regions and that other regions run as observers.
Again, given the very low expected load on the configuration store servers, you can share the same hosts used for the local ZooKeeper quorum.
For example, you can assume a Pulsar instance with the following clusters us-west
, us-east
, us-central
, eu-central
, ap-south
. Also you can assume, each cluster has its own local ZK servers named such as
zk[1-3].${CLUSTER}.example.com
In this scenario you want to pick the quorum participants from few clusters and let all the others be ZK observers. For example, to form a 7 servers quorum, you can pick 3 servers from us-west
, 2 from us-central
and 2 from us-east
.
This guarantees that writes to configuration store is possible even if one of these regions is unreachable.
The ZK configuration in all the servers looks like:
clientPort=2184 server.1=zk1.us-west.example.com:2185:2186 server.2=zk2.us-west.example.com:2185:2186 server.3=zk3.us-west.example.com:2185:2186 server.4=zk1.us-central.example.com:2185:2186 server.5=zk2.us-central.example.com:2185:2186 server.6=zk3.us-central.example.com:2185:2186:observer server.7=zk1.us-east.example.com:2185:2186 server.8=zk2.us-east.example.com:2185:2186 server.9=zk3.us-east.example.com:2185:2186:observer server.10=zk1.eu-central.example.com:2185:2186:observer server.11=zk2.eu-central.example.com:2185:2186:observer server.12=zk3.eu-central.example.com:2185:2186:observer server.13=zk1.ap-south.example.com:2185:2186:observer server.14=zk2.ap-south.example.com:2185:2186:observer server.15=zk3.ap-south.example.com:2185:2186:observer
Additionally, ZK observers need to have:
peerType=observer
Once your configuration store configuration is in place, you can start up the service using pulsar-daemon
$ bin/pulsar-daemon start configuration-store
In Pulsar, ZooKeeper configuration is handled by two separate configuration files in the conf
directory of your Pulsar installation: conf/zookeeper.conf
for local ZooKeeper and conf/global-zookeeper.conf
for configuration store.
The conf/zookeeper.conf
file handles the configuration for local ZooKeeper. The table below shows the available parameters:
Name | Description | Default |
---|---|---|
tickTime | The tick is the basic unit of time in ZooKeeper, measured in milliseconds and used to regulate things like heartbeats and timeouts. tickTime is the length of a single tick. | 2000 |
initLimit | The maximum time, in ticks, that the leader ZooKeeper server allows follower ZooKeeper servers to successfully connect and sync. The tick time is set in milliseconds using the tickTime parameter. | 10 |
syncLimit | The maximum time, in ticks, that a follower ZooKeeper server is allowed to sync with other ZooKeeper servers. The tick time is set in milliseconds using the tickTime parameter. | 5 |
dataDir | The location where ZooKeeper stores in-memory database snapshots as well as the transaction log of updates to the database. | data/zookeeper |
clientPort | The port on which the ZooKeeper server listens for connections. | 2181 |
autopurge.snapRetainCount | In ZooKeeper, auto purge determines how many recent snapshots of the database stored in dataDir to retain within the time interval specified by autopurge.purgeInterval (while deleting the rest). | 3 |
autopurge.purgeInterval | The time interval, in hours, which triggers the ZooKeeper database purge task. Setting to a non-zero number enables auto purge; setting to 0 disables. Read this guide before enabling auto purge. | 1 |
maxClientCnxns | The maximum number of client connections. Increase this if you need to handle more ZooKeeper clients. | 60 |
The conf/global-zookeeper.conf
file handles the configuration for configuration store. The table below shows the available parameters:
BookKeeper is responsible for all durable message storage in Pulsar. BookKeeper is a distributed write-ahead log WAL system that guarantees read consistency of independent message logs calls ledgers. Individual BookKeeper servers are also called bookies.
For a guide to managing message persistence, retention, and expiry in Pulsar, see this cookbook.
BookKeeper provides persistent message storage for Pulsar.
Each Pulsar broker needs to have its own cluster of bookies. The BookKeeper cluster shares a local ZooKeeper quorum with the Pulsar cluster.
You can configure BookKeeper bookies using the conf/bookkeeper.conf
configuration file. The most important aspect of configuring each bookie is ensuring that the zkServers
parameter is set to the connection string for local ZooKeeper of the Pulsar cluster.
You can start up a bookie in two ways: in the foreground or as a background daemon.
To start up a bookie in the foreground, use the bookkeeper
CLI tool:
$ bin/bookkeeper bookie
To start a bookie in the background, use the pulsar-daemon
CLI tool:
$ bin/pulsar-daemon start bookie
You can verify that the bookie works properly using the bookiesanity
command for the BookKeeper shell:
$ bin/bookkeeper shell bookiesanity
This command creates a new ledger on the local bookie, writes a few entries, reads them back and finally deletes the ledger.
Bookie hosts are responsible for storing message data on disk. In order for bookies to provide optimal performance, ensuring that the bookies have a suitable hardware configuration is essential. You can choose two key dimensions to bookie hardware capacity:
Message entries written to bookies are always synced to disk before returning an acknowledgement to the Pulsar broker. To ensure low write latency, BookKeeper is designed to use multiple devices:
you can find configurable parameters for BookKeeper bookies in the conf/bookkeeper.conf
file.
Minimum configuration changes required in conf/bookkeeper.conf
are:
# Change to point to journal disk mount point journalDirectory=data/bookkeeper/journal # Point to ledger storage disk mount point ledgerDirectories=data/bookkeeper/ledgers # Point to local ZK quorum zkServers=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181
To change the zookeeper root path that Bookkeeper uses, use zkLedgersRootPath=/MY-PREFIX/ledgers instead of zkServers=localhost:2181/MY-PREFIX
Consult the official BookKeeper docs for more information about BookKeeper.
In Pulsar, you can set persistence policies, at the namespace level, that determine how BookKeeper handles persistent storage of messages. Policies determine four things:
You can set persistence policies for BookKeeper at the namespace level.
Use the set-persistence
subcommand and specify a namespace as well as any policies that you want to apply. The available flags are:
Flag | Description | Default |
---|---|---|
-a , --bookkeeper-ack-quorum | The number of acks (guaranteed copies) to wait on for each entry | 0 |
-e , --bookkeeper-ensemble | The number of bookies to use for topics in the namespace | 0 |
-w , --bookkeeper-write-quorum | The number of writes to make for each entry | 0 |
-r , --ml-mark-delete-max-rate | Throttling rate for mark-delete operations (0 means no throttle) | 0 |
The following is an example:
$ pulsar-admin namespaces set-persistence my-tenant/my-ns \ --bookkeeper-ack-quorum 3 \ --bookkeeper-ensemble 2
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/persistence|operation/setPersistence?version=[[pulsar:version_number]]}
int bkEnsemble = 2; int bkQuorum = 3; int bkAckQuorum = 2; double markDeleteRate = 0.7; PersistencePolicies policies = new PersistencePolicies(ensemble, quorum, ackQuorum, markDeleteRate); admin.namespaces().setPersistence(namespace, policies);
You can see which persistence policy currently applies to a namespace.
Use the get-persistence
subcommand and specify the namespace.
The following is an example:
$ pulsar-admin namespaces get-persistence my-tenant/my-ns { "bookkeeperEnsemble": 1, "bookkeeperWriteQuorum": 1, "bookkeeperAckQuorum", 1, "managedLedgerMaxMarkDeleteRate": 0 }
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/persistence|operation/getPersistence?version=[[pulsar:version_number]]}
PersistencePolicies policies = admin.namespaces().getPersistence(namespace);
This diagram illustrates the role of ZooKeeper and BookKeeper in a Pulsar cluster:
Each Pulsar cluster consists of one or more message brokers. Each broker relies on an ensemble of bookies.