Pulsar messages are stored as unstructured byte arrays and the data structure (as known as schema) is applied to this data only when it's read. So both the producer and consumer need to agree upon the data structure of the messages, including the fields and their associated types.
Pulsar schema is the metadata that defines how to translate the raw message bytes into a more formal structure type, serving as a protocol between the applications that generate messages and the applications that consume them. It serializes data into raw bytes before they are published to a topic and deserializes the raw bytes before they are delivered to consumers.
Pulsar uses a schema registry as a central repository to store the registered schema information, which enables producers/consumers to coordinate the schema of a topic's messages through brokers.
:::note
Currently, Pulsar schema is available for Java clients, Go clients, Python clients, Node.js clients, C++ clients, and C# clients.
:::
Type safety is extremely important in any application built around a messaging and streaming system. Raw bytes are flexible for data transfer, but the flexibility and neutrality come with a cost: you have to overlay data type checking and serialization/deserialization to ensure that the bytes fed into the system can be read and successfully consumed. In other words, you need to make sure the data is intelligible and usable to applications.
Pulsar schema resolves the pain points with the following capabilities:
Pulsar schemas are applied and enforced at the topic level. Producers and consumers can upload schemas to brokers, so Pulsar schemas work on both sides.
This diagram illustrates how Pulsar schema works on the producer side.
Below are explanations for each step.
The application uses a schema instance to construct a producer instance. The schema instance defines the schema for the data being produced using the producer instance. Take Avro as an example, Pulsar extracts the schema definition from the POJO class and constructs the SchemaInfo.
The producer requests to connect to the broker with the SchemaInfo extracted from the passed-in schema instance.
The broker looks up the schema registry to check if it is a registered schema.
The broker checks whether the schema can be auto-updated.
The broker performs the schema compatibility check defined for the topic.
This diagram illustrates how schema works on the consumer side.
Below are explanations for each step.
The application uses a schema instance to construct a consumer instance.
The consumer connects to the broker with the SchemaInfo extracted from the passed-in schema instance.
The broker checks if the topic is in use (has at least one of the objects: schema, data, active producer or consumer).
The broker checks whether the schema can be auto-updated.
The broker performs the schema compatibility check.
You can use language-specific types of data when constructing and handling messages from simple data types like string to more complex application-specific types.
For example, you are using the User class to define the messages sent to Pulsar topics.
public class User { public String name; public int age; User() {} User(String name, int age) { this.name = name; this.age = age; } }
Without a schema
If you construct a producer without specifying a schema, then the producer can only produce messages of type byte[]. If you have a POJO class, you need to serialize the POJO into bytes before sending messages.
Producer<byte[]> producer = client.newProducer() .topic(topic) .create(); User user = new User("Tom", 28); byte[] message = … // serialize the `user` by yourself; producer.send(message);
With a schema
This example constructs a producer with the JSONSchema, and you can send the User class to topics directly without worrying about how to serialize POJOs into bytes.
// send with json schema Producer<User> producer = client.newProducer(JSONSchema.of(User.class)) .topic(topic) .create(); User user = new User("Tom", 28); producer.send(user); // receive with json schema Consumer<User> consumer = client.newConsumer(JSONSchema.of(User.class)) .topic(schemaTopic) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionName("schema-sub") .subscribe(); Message<User> message = consumer.receive(); User user = message.getValue(); assert user.age == 28 && user.name.equals("Tom");