Pulsar Cluster Setup

Setup

System requirements

Supported platforms:

  • Linux
  • MacOS X

Required software:

  • Java 1.8

Components

ZooKeeper

Add all ZK servers the quorum configuration. Edit conf/zookeeper.conf and add the following lines in all the ZK servers:

server.1=zk1.us-west.example.com:2888:3888
server.2=zk2.us-west.example.com:2888:3888
server.3=zk3.us-west.example.com:2888:3888
...

Start ZK service on all the hosts:

$ bin/pulsar-daemon start zookeeper

Global ZooKeeper

Configure the global quorum by adding the participants and all the observers.

Single cluster pulsar instance

When deploying a pulsar instance with a single cluster, the global zookeeper can be deployed in the same machines as the local ZK quorum, running on different TCP ports. Add the servers in conf/global_zookeeper.conf, to start the service on port 2184:

clientPort=2184
server.1=zk1.us-west.example.com:2185:2186
server.2=zk2.us-west.example.com:2185:2186
server.3=zk3.us-west.example.com:2185:2186
...
Multi-cluster pulsar instance

When deploying a global Pulsar instance, with clusters distributed across different geographical regions, the global ZooKeeper serves as a highly-available and strongly-consistent metadata store that can tolerate whole regions failures and partitions.

The key here is to make sure the ZK quorum members are spread across at least 3 regions and that other regions are running as observers.

Again, given the very low expected load on the global ZooKeeper servers, we can share the same hosts used for the local ZooKeeper quorum.

For example, let‘s assume a Pulsar instance with the following clusters us-west, us-east, us-central, eu-central, ap-south. Also let’s assume, each cluster will have its own local ZK servers named such as

zk[1-3].${CLUSTER}.example.com

In this scenario we want to pick the quorum participants from few clusters and let all the others be ZK observers. For example, to form a 7 servers quorum, we can pick 3 servers from us-west, 2 from us-central and 2 from us-east.

This will guarantee that writes to global ZooKeeper will be possible even if one of these regions is unreachable.

The ZK configuration in all the servers will look like:

clientPort=2184
server.1=zk1.us-west.example.com:2185:2186
server.2=zk2.us-west.example.com:2185:2186
server.3=zk3.us-west.example.com:2185:2186
server.4=zk1.us-central.example.com:2185:2186
server.5=zk2.us-central.example.com:2185:2186
server.6=zk3.us-central.example.com:2185:2186:observer
server.7=zk1.us-east.example.com:2185:2186
server.8=zk2.us-east.example.com:2185:2186
server.9=zk3.us-east.example.com:2185:2186:observer
server.10=zk1.eu-central.example.com:2185:2186:observer
server.11=zk2.eu-central.example.com:2185:2186:observer
server.12=zk3.eu-central.example.com:2185:2186:observer
server.13=zk1.ap-south.example.com:2185:2186:observer
server.14=zk2.ap-south.example.com:2185:2186:observer
server.15=zk3.ap-south.example.com:2185:2186:observer

Additionally, ZK observers will need to have :

peerType=observer
Starting the service
$ bin/pulsar-daemon start global-zookeeper

Cluster metadata initialization

When setting up a new cluster, there is some metadata that needs to be initialized for the first time. The following command will prepare both the BookKeeper as well as the Pulsar metadata.

$ bin/pulsar initialize-cluster-metadata --cluster us-west \
                                         --zookeeper zk1.us-west.example.com:2181 \
                                         --global-zookeeper zk1.us-west.example.com:2184 \
                                         --service-url http://pulsar.us-west.example.com:8080/ \
                                         --service-url-tls https://pulsar.us-west.example.com:8443/

BookKeeper

Bookie hosts are responsible for storing the data on disk and it's very important to have a suitable hardware configuration to ensure good performance.

There are two key dimensions for capacity:

  • Disk I/O capacity read/write
  • Storage capacity

Entries written to a Bookie are always synced on disk before return an acknowledgment to the Pulsar broker. To ensure low write latency, BookKeeper is designed to use multiple devices:

  • a journal to ensure durability

    • It is critical to have fast fsync operation on this device for sequential writes. Typically, a small and fast SSDs will be fine, or HDDs with RAID controller and battery backed write cache. Both solutions can reach fsync latency of ~0.4 ms.
  • the “Ledger storage device”

    • This is where data is stored until all the consumers have acknowledge the messages. Writes will happen in background, so write IO is not a big concern. Reads will happen sequentially most of the type and only in case some consumer is draining backlog. Typical configuration will use multiple HDDs with RAID controller, to be able to store large amounts of data
Configuration

Minimum changes required to configuration in conf/bookkeeper.conf are:

# Change to point to journal disk mount point
journalDirectory=data/bookkeeper/journal

# Point to ledger storage disk mount point
ledgerDirectories=data/bookkeeper/ledgers

# Point to local ZK quorum
zkServers=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181

# Change the ledger manager type
ledgerManagerType=hierarchical

Please consult http://bookkeeper.apache.org/ for more extensive documentation on Apache BookKeeper.

Starting the service

Start the bookie:

$ bin/pulsar-daemon start bookie

Verify the bookie is working properly:

$ bin/bookkeeper shell bookiesanity

This will create a new ledger on the local bookie, write few entries, read them back and finally delete the ledger.

Broker

Pulsar brokers do not need any special hardware consideration since they don't use the local disk. Fast CPUs and 10Gbps NIC are recommended since the software can take full advantage of that.

Minimal configuration changes in conf/broker.conf will include:

# Local ZK servers
zookeeperServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181

# Global Zookeeper quorum connection string. Here we just need to specify the
# servers located in the same cluster
globalZookeeperServers=zk1.us-west.example.com:2184,zk2.us-west.example.com:2184,zk3.us-west.example.com:2184

clusterName=us-west
Start broker service
$ bin/pulsar-daemon start broker

Service discovery

Service discovery component is used to give a single URL for the clients to use.

You can either use the provided discovery-service or any other method. The only requirement is that when the client does a HTTP request on http://pulsar.us-west.example.com:8080/ it must be redirected (through DNS, IP or HTTP redirect) to an active broker, without preference.

The included discovery service works with HTTP redirect and it maintains the list of active brokers from ZooKeeper.

Add the ZK servers in conf/discovery.conf:

# Zookeeper quorum connection string
zookeeperServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181

# Global zookeeper quorum connection string
globalZookeeperServers=zk1.us-west.example.com:2184,zk2.us-west.example.com:2184,zk3.us-west.example.com:2184

Start the service:

$ bin/pulsar-daemon start discovery

Admin client and verification

At this point the cluster should be ready to use. We can now configure a client machine that can serve as the administrative client.

Edit conf/client.conf to point the client to the correct service URL:

serviceUrl=http://pulsar.us-west.example.com:8080/
Provisioning a new tenant

To allow a new tenant to use the system, we need to create a new property. Typically this will be done by the Pulsar cluster administrator or by some automated tool:

$ bin/pulsar-admin properties create test \
                --allowed-clusters us-west \
                --admin-roles test-admin-role

This will allow users who identify with role test-admin-role to administer the configuration for the property test which will only be allowed to use the cluster us-west.

The tenant will be able from now on to self manage its resources.

The first step is to create a namespace. A namespace is an administrative unit that can contain many topic. Common practice is to create a namespace for each different use case from a single tenant.

$ bin/pulsar-admin namespaces create test/us-west/ns1
Testing producer and consumer

Everything is now ready to send and receive messages. The quickest way to test the system is through the pulsar-perf client tool.

Let's use a topic in the namespace we just created. Topics are automatically created the first time a producer or a consumer tries to use them.

The topic name in this case could be:

persistent://test/us-west/ns1/my-topic

Start a consumer that will create a subscription on the topic and will wait for messages:

$ bin/pulsar-perf consume persistent://test/us-west/ns1/my-topic

Start a produce that publishes messages at a fixed rate and report stats every 10 seconds:

$ bin/pulsar-perf produce persistent://test/us-west/ns1/my-topic

To report the topic stats:

$ bin/pulsar-admin persistent stats persistent://test/us-west/ns1/my-topic

Monitoring

Broker stats

Pulsar broker metrics can be collected from the brokers and are exported in JSON format.

There are two main types of metrics:

  • Destination dump, containing stats for each individual topic
bin/pulsar-admin broker-stats destinations
  • Broker metrics, containing broker info and topics stats aggregated at namespace level:
bin/pulsar-admin broker-stats monitoring-metrics

All the message rates are updated every 1min.

BookKeeper stats

There are several stats frameworks that works with BookKeeper and that can be enabled by changing the statsProviderClass in conf/bookkeeper.conf.

By following the instructions above, the DataSketchesMetricsProvider will be enabled. It features a very efficient way to compute latency quantiles, along with rates and counts.

The stats are dumped every interval into a JSON file that is overwritten each time.

statsProviderClass=org.apache.bokkeeper.stats.datasketches.DataSketchesMetricsProvider
dataSketchesMetricsJsonFileReporter=data/bookie-stats.json
dataSketchesMetricsUpdateIntervalSeconds=60