Apache Pulsar currently provides a built-in schema management system tightly coupled with the broker. Pulsar clients interact with this system implicitly when creating producers and consumers.
However, many organizations already have independent schema registry services (such as Confluent Schema Registry) and wish to reuse their existing schema governance processes across multiple messaging systems, including Pulsar.
By enabling Pulsar clients to integrate with third-party schema registry services:
This flexibility is particularly valuable for enterprises with strict schema validation, versioning, and governance workflows already centralized in external registries.
AutoProduceBytesSchema
and AutoConsumeSchema
.This PIP aims to enable the Pulsar client to directly integrate with external schema registry services for schema management. In this model, the external schema registry is fully responsible for schema storage, retrieval, and validation. The Pulsar broker will no longer manage schema data for topics using external schemas.
Pulsar will introduce a new schema type: SchemaType.EXTERNAL.
SchemaType.EXTERNAL
.EXTERNAL
schema type, the Pulsar client will provide empty schema data to the broker.SchemaType.EXTERNAL
can't be compatible with other Pulsar schemasThis design isolates external schema management and protects existing topics using Pulsar’s native schema system.
To integrate with external schema registries, users can:
Schema
interface to define custom schema encoding and decoding logic.Schema
Interface Methods:Schema interface introduces methods for encoding and decoding messages, allowing external schema implementations to handle serialization and deserialization.
EncodeData encode(String topic, T message) (New addition)
EncodeData
object that contains:SchemaSerializationException
if the serialization or deserialization fails.T decode(String topic, byte[] data, byte[] schemaId) (New addition)
closeAsync() (New addition)
During producer or consumer initialization: The external schema info will be registered to Pulsar schema storage.
During producing or receiving messages: The encode
and decode
methods handle the schema-aware serialization and deserialization using the external schema registry.
Unlike Pulsar, which uses schema version to identify schemas, many external schema registry systems use schema ID as the primary schema identifier.
When integrating with external schema registries:
schemaVersion
will point to external schema info.EncodeData
object that contains the encoded data and the schema ID.schema_id
in the MessageMetadata
.The KeyValueSchemaID is also a byte array, the format is: keySchemaIdLength(4) + keySchemaId + valueSchemaIdLength(4) + valueSchemaId, external schemas need to decode the key and value schema IDs from the KeyValueSchemaID.
This approach allows external schema systems to fully control schema evolution and versioning without being constrained by Pulsar’s native schema versioning mechanism. This may impact some components that rely on schema version to deserialize messages, such as Pulsar Functions and Pulsar SQL; they need to be updated to support setting schema properties, using the new schema type and handle external schemas appropriately.
public void workWithExternalSchemaRegistry() throws Exception { String topic = "testExternalJsonSchema"; Map<String, String> configs = new HashedMap<>(); configs.put("schema.registry.url", getSchemaRegistryUrl()); configs.put("json.fail.unknown.properties", "false"); KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(configs); Schema<User> schema = kafkaSchemaFactory.json(User.class); @Cleanup PulsarClient pulsarClient = getPulsarClient(); @Cleanup Producer<User> producer = pulsarClient.newProducer(schema) .topic(topic) .create(); @Cleanup Consumer<User> consumer = pulsarClient.newConsumer(schema) .topic(topic) .subscriptionName("sub") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); for (int i = 0; i < 10; i++) { producer.send(new User("name-" + i, 10 + i)); } for (int i = 0; i < 10; i++) { Message<User> message = consumer.receive(); consumer.acknowledge(message); System.out.println("receive msg " + message.getValue().getClass().getName() + " " + message.getValue()); } }
// File `SchemaRegistryFormat.proto` message SchemaInfo { enum SchemaType { EXTERNAL = 22; } }
// File `PulsarApi.proto` message Schema { enum Type { External = 22; } }
Add a new field schema_id
to the MessageMetadata
to store the schema ID for messages that use external schemas.
// File `PulsarApi.proto` message MessageMetadata { optional bytes schema_id = 32; }
Introduce a new SchemaType EXTERNAL
to represent the schema types that work with external schema registry.
public enum SchemaType { /** * External Schema Type. * <p> * This is used to indicate that the schema is managed externally, such as in a schema registry. * External schema type is not compatible with any other schema type. * </p> */ EXTERNAL(21) }
Add a new class EncodeData
to encapsulate the encoded data and schema ID.
public class EncodeData { private byte[] data; private byte[] schemaId; }
Add a new method getSchemaId()
to the Message
interface to retrieve the schema ID of the message. This method will return the schema ID if the message is produced with an external schema, otherwise it will return null.
public interface Message<T> { /** * Get schema ID of the message. * PIP-420 provides a way to produce messages with external schema, * and the schema ID will be set to the message metadata. * * @return the schema ID if the message is produced with external schema and schema ID is set, otherwise empty. */ Optional<byte[]> getSchemaId(); }
Add two methods to encode and decode messages, which can be used by external schemas to serialize and deserialize messages. The encode method returns an EncodeData
object that contains the encoded byte array and schema ID. The customized external schemas can set the SchemaInfoProvider
and retrieve the configs from it, extends the interface AutoCloseable
to support close external schema resources.
import java.util.concurrent.CompletableFuture; public interface Schema extends Cloneable { /** * Encodes the message into a byte array using the schema. * * @param topic the topic for which the message is being encoded * @param message the message to encode * @return the encoded byte array and schema ID * @throws SchemaSerializationException if the encoding fails */ default EncodeData encode(String topic, T message) { return new EncodeData(encode(message), null); } default T decode(String topic, ByteBuffer data, byte[] schemaId) { return decode(topic, getBytes(data), schemaId); } /** * Decodes a byte array into a message using the schema. * * @param topic the topic for which the message is being decoded * @param data the byte array to decode * @param schemaId the schema ID associated with the data * @return the decoded message * @throws SchemaSerializationException if the decoding fails */ default T decode(String topic, byte[] data, byte[] schemaId) { return decode(data, schemaId); } default void closeAsync() { return CompletableFuture.completedFuture(null); } }
For support using third-party schema registry service in Pulsar Function,
SchemaType.EXTERNAL
schema type in Pulsar FunctionIntegrating third-party schema registry services introduces a new approach to managing schemas for geo-replicated topics.
In the current Pulsar architecture:
By using an external schema registry:
The new schema type SchemaType.EXTERNAL
doesn‘t break any existing Pulsar topics, it’s not compatible with other Pulsar schema types.
Use bytes
schema for “external” schemas, it can‘t provide any compatibility checks to protect topic data that use Pulsar’s native schema system.