Pulsar is a multi-tenant, high-performance solution for server to server messaging. Key features include:
At a high level, a Pulsar instance is composed of one or multiple clusters, each could reside in different geographical regions. A single Pulsar cluster is composed of a set of message brokers and bookkeepers, plus zookeeper ensembles for coordination and configuration management. Finally, a client library is provided with easy-to-use APIs.
The Pulsar broker is a state-less component which primarily runs two different components: a HTTP server that exposes a REST interface for topic lookup and administrative tasks, and a dispatcher, which is an asynchronous TCP server over a custom binary protocol used for all data transfers.
The messages are typically dispatched out of the managed ledger cache, unless the backlog exceeds the cache size, in which case the broker will start reading entries from Bookkeeper.
Finally, to support geo replication on global topics, the broker manages the replicators which tail the entries published in the local region and republish them to the remote region using the Pulsar client library itself.
Pulsar uses Apache Bookkeeper as durable storage which is a distributed write-ahead log system. With Bookkeeper, applications can create many independent logs, called ledgers. A ledger is an append-only data structure with a single writer that is assigned to multiple storage nodes (or bookies) and whose entries are replicated to multiple of these nodes. The semantics of a ledger are very simple: a process can create a ledger, append entries and close the ledger. After the ledger has been closed, either explicitly or because the writer process crashed, it can only be opened in read-only mode. Finally, when the entries contained in the ledger are no longer needed, the whole ledger can be deleted from the system.
The main strength of Bookkeeper is to guarantee the read consistency of the ledger in the presence of failures. Since the ledger can only be written by a single process, this process is free to append entries very efficiently (without need for further consensus) and after a failure, the ledger will go through a recovery process that will finalize the state of the ledger and establish which entry was last committed to the log. After that point, all readers of the ledger are guaranteed to see the exact same content.
Pulsar uses BookKeeper since it is a very efficient sequential store that handles entry replication, node failures, and it is horizontally scalable in capacity and throughput. From an operational perspective, the capacity could be immediately increased by simply adding more bookies to a Pulsar cluster. The other strength of Bookkeeper is that the bookies are designed to handle thousands of ledgers with concurrent reads and writes and, by using multiple disk devices (one for journal and another for general storage) are able to isolate the effects of read operations from the latency of ongoing write operations.
Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called managed ledger which represents the storage layer for a single topic. A managed ledger represents the abstraction of a stream of messages with a single writer that keeps appending at the end of the stream and multiple cursors that are consuming the stream, each with its own associated position.
Internally, a single managed ledger uses multiple Bookkeeper ledgers to store the data. There are two reasons to have multiple ledgers: first, after a failure a ledger is not writable anymore and we need to create a new one, and second we want to rollover ledgers periodically so we may delete a ledger when all cursors have consumed the messages it contains.
Pulsar uses Apache Zookeeper for metadata, cluster configuration and coordination.
A topic is a logical endpoint for publishing and consuming messages. Producers publish messages to the topic and consumers subscribe to the topic, to consume messages published to the topic. Pulsar allows multiple subscription modes on a topic to support pub/sub, load balancer, and failover use-cases.
A normal topic (except partitioned topic) does not need to be explicitly created, it is created on the fly when client try to publish/consume messages on the topic.
Subscription is a durable resource that gets created the first time a consumer subscribes to the topic with the given subscription name. It receives all the messages published on the topic after its own creation. If no consumer is attached to this subscription, all the messages published on the topic will be retained as backlog. Finally, a consumer can unsubscribe to remove the subscription from the topic.
Subscription is a configuration rule that determines how messages are delivered to a consumer.
Exclusive
Shared
Failover
Property and namespace are two key concepts of Pulsar to support multi-tenant.
E.g.: my-property/us-w/my-app1
is a namespace for the application my-app1
in cluster us-w
for my-property
.
Topics names for such namespaces will look like:
persistent://my-property/us-w/my-app1/my-topic-1 persistent://my-property/us-w/my-app1/my-topic-2 ...
A producer attaches to a topic and produces messages.
sync vs. async send - message could be sent to broker in sync or async manner:
compression - message could be compressed during transportation to save bandwidth (compression and de-compression both are performed by client), below types of compression are supported:
batch - if batching is enabled, producer will try to accumulate and send batch of messages in a single request. Batching size defined by maximum number of messages and maximum publish latency.
A consumer attaches to a subscription and receives messages.
sync vs. async receive - sync receive will be blocked until a message is available. Async receive will return immediately with an instance of CompletableFuture, which completes with received message once new message is available.
acknowledgement - message could be acknowledged individually one by one or cumulatively. With cumulative acknowledgement consumer only need to acknowledge the last message it received, all messages in the stream up to (and include) the provided message will not be re-delivered to this consumer. Cumulative acknowledgement cannot be used with Shared subscription mode.
listener - a customized MessageListener implementation could be passed to consumer, client library will call the listener whenever a new message is received (no need to call consumer receive).
A normal topic could only be served by a single broker which limits its maximum throughput, partitioned topic as a special type of topic could span across multiple brokers to achieve higher throughput. Partitioned topic need to be explicitly created via admin API/CLI, number of partitions can be specified when creating the topic.
A partitioned topic is actually implemented as N (number of partitions) internal topics, there is no difference between the internal topics and other normal topics on how subscription modes work.
routing mode - routing mode decides which partition (internal topic) a message will be published to:
public interface MessageRouter extends Serializable { /** * @param msg Message object * @return The index of the partition to use for the message */ int choosePartition(Message msg); }
Guaranteed message delivery requires that messages are stored in a durable manner until they are delivered to and acknowledged by consumers. This mode of messaging is commonly called Persistent Messaging.
Message durability is set at the topic level. A topic can either be “persistent” or “non-persistent” and that is included in its own name: persistent://my-property/global/my-ns/my-topic
persistent
non-persistent
Pulsar enables messages to be produced and consumed in different geo-locations. For instance your application may be publishing data in one geo/market and you would like to process it for consumption in other geos/markets. Global Replication in Pulsar enable you to do that.
Pulsar supports a pluggable authentication mechanism which can be configured at broker and it also supports authorization to identify client and its access rights on topics and properties.
Pulsar exposes a client API with Java language bindings. The client API optimizes and encapsulates client-broker communication protocol and exposes a simple and intuitive API for use by the application. Under the hood, the client library supports transparent reconnection and/or connection failover to a broker, queuing of messages until acknowledged by broker and heuristics such as connection retries with backoff.
When an application wants to create a producer/consumer, the Pulsar client library will internally initiate the setup phase that is composed of two steps. The first task is to find owner for the topic by sending a lookup HTTP request. The request could reach one of the active brokers which, by looking at the (cached) zookeeper metadata will know who is serving the topic or, in case nobody is serving it, will try to assign it to the least loaded broker.
Once the client library has the broker address, it will create a TCP connection (or reuse an existing connection from the pool) and authenticate it. Within this connection, client and broker exchange binary commands from a custom protocol. At this point the client will send a command to create producer/consumer to the broker, which will comply after having validated the authorization policy.
Whenever the TCP connection breaks, the client will immediately re-initiate this setup phase and will keep trying with exponential back-off to re-establish the producer or consumer until it succeeds.
A PulsarClient (TODO: javadocs) instance is needed before producing/consuming messages.
ClientConfiguration config = new ClientConfiguration(); PulsarClient pulsarClient = PulsarClient.create("pulsar://broker.example.com:6650", config); ... pulsarClient.close();
ClientConfiguration (TODO: javadocs) is used to pass arguments to PulsarClient:
// Set the authentication provider to use in the Pulsar client instance. public void setAuthentication(Authentication authentication); public void setAuthentication(String authPluginClassName, String authParamsString); public void setAuthentication(String authPluginClassName, Map<String, String> authParams); // Set the operation timeout(default: 30 seconds) public void setOperationTimeout(int operationTimeout, TimeUnit unit); // Set the number of threads to be used for handling connections to brokers (default: 1 thread) public void setIoThreads(int numIoThreads); // Set the number of threads to be used for message listeners(default: 1 thread) public void setListenerThreads(int numListenerThreads); // Sets the max number of connection that the client library will open to a single broker. public void setConnectionsPerBroker(int connectionsPerBroker); // Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm. public void setUseTcpNoDelay(boolean useTcpNoDelay);
Create a Consumer (TODO javadocs) with PulsarClient and receive 10 messages.
ConsumerConfiguration conf = new ConsumerConfiguration(); conf.setSubscriptionType(SubscriptionType.Exclusive); Consumer consumer = pulsarClient.subscribe( "persistent://my-property/us-w/my-ns/my-topic", "my-subscriber-name", conf); for (int i = 0; i < 10; i++) { // Receive a message Msg msg = consumer.receive(); // Do something System.out.println("Received: " + new String(msg.getData())); // Acknowledge successful message processing consumer.acknowledge(msg); } consumer.close();
ConsumerConfiguration (TODO javadocs) could be used to pass arguments to consumer:
// Set the timeout for unacked messages, truncated to the nearest millisecond. public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit); // Select the subscription type to be used when subscribing to the topic. public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType); // Sets a MessageListener for the consumer public ConsumerConfiguration setMessageListener(MessageListener messageListener); // Sets the size of the consumer receive queue. public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize);
Creates a Producer (TODO javadocs) with PulsarClient and publish 10 messages.
ProducerConfiguration config = new ProducerConfiguration(); Producer producer = pulsarClient.createProducer( "persistent://my-property/us-w/my-ns/my-topic", config); // publish 10 messages to the topic for (int i = 0; i < 10; i++) { producer.send("my-message".getBytes()); } producer.close();
ProducerConfiguration (TODO javadocs) could be used to pass arguments to producer:
// Set the send timeout (default: 30 seconds) public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit); // Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages); // Set whether the Producer#send and Producer#sendAsync operations should block when the outgoing message queue is full. public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull); // Set the message routing mode for the partitioned producer public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode); // Set the compression type for the producer. public ProducerConfiguration setCompressionType(CompressionType compressionType); // Set a custom message routing policy by passing an implementation of MessageRouter public ProducerConfiguration setMessageRouter(MessageRouter messageRouter); // Control whether automatic batching of messages is enabled for the producer, default: false. public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled); // Set the time period within which the messages sent will be batched, default: 10ms. public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit); // Set the maximum number of messages permitted in a batch, default: 1000. public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch);